[PATCH 09/10] memory: implement RamDiscardManager multi-source aggregation

marcandre.lureau@redhat.com posted 10 patches 5 days, 12 hours ago
Maintainers: Paolo Bonzini <pbonzini@redhat.com>, Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>, Mark Kanda <mark.kanda@oracle.com>, Ben Chaney <bchaney@akamai.com>, Alex Williamson <alex@shazbot.org>, "Cédric Le Goater" <clg@redhat.com>, David Hildenbrand <david@kernel.org>, "Michael S. Tsirkin" <mst@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>
[PATCH 09/10] memory: implement RamDiscardManager multi-source aggregation
Posted by marcandre.lureau@redhat.com 5 days, 12 hours ago
From: Marc-André Lureau <marcandre.lureau@redhat.com>

Refactor RamDiscardManager to aggregate multiple RamDiscardSource
instances. This enables scenarios where multiple components (e.g.,
virtio-mem and RamBlockAttributes) can coordinate memory discard
state for the same memory region.

The aggregation uses:
- Populated: ALL sources populated
- Discarded: ANY source discarded

When a source is added with existing listeners, they are notified
about regions that become discarded. When a source is removed,
listeners are notified about regions that become populated.

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
---
 include/system/ram-discard-manager.h | 139 +++++++--
 hw/virtio/virtio-mem.c               |   8 +-
 system/memory.c                      |  15 +-
 system/ram-block-attributes.c        |   6 +-
 system/ram-discard-manager.c         | 434 ++++++++++++++++++++++++---
 5 files changed, 527 insertions(+), 75 deletions(-)

diff --git a/include/system/ram-discard-manager.h b/include/system/ram-discard-manager.h
index b188e09a30f..1c5a6f55833 100644
--- a/include/system/ram-discard-manager.h
+++ b/include/system/ram-discard-manager.h
@@ -206,28 +206,96 @@ struct RamDiscardSourceClass {
  * becoming discarded in a different granularity than it was populated and the
  * other way around.
  */
+
+typedef struct RamDiscardSourceEntry RamDiscardSourceEntry;
+
+struct RamDiscardSourceEntry {
+    RamDiscardSource *rds;
+    QLIST_ENTRY(RamDiscardSourceEntry) next;
+};
+
 struct RamDiscardManager {
     Object parent;
 
-    RamDiscardSource *rds;
-    MemoryRegion *mr;
+    struct MemoryRegion *mr;
+    QLIST_HEAD(, RamDiscardSourceEntry) source_list;
+    uint64_t min_granularity;
     QLIST_HEAD(, RamDiscardListener) rdl_list;
 };
 
-RamDiscardManager *ram_discard_manager_new(MemoryRegion *mr,
-                                           RamDiscardSource *rds);
+RamDiscardManager *ram_discard_manager_new(MemoryRegion *mr);
+
+/**
+ * ram_discard_manager_add_source:
+ *
+ * Register a #RamDiscardSource with the #RamDiscardManager. The manager
+ * aggregates state from all registered sources using AND semantics: a region
+ * is considered populated only if ALL sources report it as populated.
+ *
+ * If listeners are already registered, they will be notified about any
+ * regions that become discarded due to adding this source. Specifically,
+ * for each region that the new source reports as discarded, if all other
+ * sources reported it as populated, listeners receive a discard notification.
+ *
+ * If any listener rejects the notification (returns an error), previously
+ * notified listeners are rolled back with populate notifications and the
+ * source is not added.
+ *
+ * @rdm: the #RamDiscardManager
+ * @source: the #RamDiscardSource to add
+ *
+ * Returns: 0 on success, -EBUSY if @source is already registered, or a
+ *          negative error code if a listener rejected the state change.
+ */
+int ram_discard_manager_add_source(RamDiscardManager *rdm,
+                                   RamDiscardSource *source);
+
+/**
+ * ram_discard_manager_del_source:
+ *
+ * Unregister a #RamDiscardSource from the #RamDiscardManager.
+ *
+ * If listeners are already registered, they will be notified about any
+ * regions that become populated due to removing this source. Specifically,
+ * for each region that the removed source reported as discarded, if all
+ * remaining sources report it as populated, listeners receive a populate
+ * notification.
+ *
+ * If any listener rejects the notification (returns an error), previously
+ * notified listeners are rolled back with discard notifications and the
+ * source is not removed.
+ *
+ * @rdm: the #RamDiscardManager
+ * @source: the #RamDiscardSource to remove
+ *
+ * Returns: 0 on success, -ENOENT if @source is not registered, or a
+ *          negative error code if a listener rejected the state change.
+ */
+int ram_discard_manager_del_source(RamDiscardManager *rdm,
+                                   RamDiscardSource *source);
+
 
 uint64_t ram_discard_manager_get_min_granularity(const RamDiscardManager *rdm,
                                                  const MemoryRegion *mr);
 
+/**
+ * ram_discard_manager_is_populated:
+ *
+ * Check if the given memory region section is populated.
+ * If the manager has no sources, it is considered populated.
+ *
+ * @rdm: the #RamDiscardManager
+ * @section: the #MemoryRegionSection to check
+ *
+ * Returns: true if the section is populated, false otherwise.
+ */
 bool ram_discard_manager_is_populated(const RamDiscardManager *rdm,
                                       const MemoryRegionSection *section);
 
 /**
  * ram_discard_manager_replay_populated:
  *
- * A wrapper to call the #RamDiscardSourceClass.replay_populated callback
- * of the #RamDiscardSource sources.
+ * Call @replay_fn on regions that are populated in all sources.
  *
  * @rdm: the #RamDiscardManager
  * @section: the #MemoryRegionSection
@@ -244,8 +312,7 @@ int ram_discard_manager_replay_populated(const RamDiscardManager *rdm,
 /**
  * ram_discard_manager_replay_discarded:
  *
- * A wrapper to call the #RamDiscardSourceClass.replay_discarded callback
- * of the #RamDiscardSource sources.
+ * Call @replay_fn on regions that are discarded in any sources.
  *
  * @rdm: the #RamDiscardManager
  * @section: the #MemoryRegionSection
@@ -266,31 +333,61 @@ void ram_discard_manager_register_listener(RamDiscardManager *rdm,
 void ram_discard_manager_unregister_listener(RamDiscardManager *rdm,
                                              RamDiscardListener *rdl);
 
-/*
- * Note: later refactoring should take the source into account and the manager
- *       should be able to aggregate multiple sources.
+/**
+ * ram_discard_manager_notify_populate:
+ *
+ * Notify listeners that a region is about to be populated by a source.
+ * For multi-source aggregation, only notifies when all sources agree
+ * the region is populated (intersection).
+ *
+ * @rdm: the #RamDiscardManager
+ * @source: the #RamDiscardSource that is populating
+ * @offset: offset within the memory region
+ * @size: size of the region being populated
+ *
+ * Returns 0 on success, or a negative error if any listener rejects.
  */
 int ram_discard_manager_notify_populate(RamDiscardManager *rdm,
+                                        RamDiscardSource *source,
                                         uint64_t offset, uint64_t size);
 
-/*
- * Note: later refactoring should take the source into account and the manager
- *       should be able to aggregate multiple sources.
+/**
+ * ram_discard_manager_notify_discard:
+ *
+ * Notify listeners that a region has been discarded by a source.
+ * For multi-source aggregation, always notifies immediately
+ * (union semantics - any source discarding makes region discarded).
+ *
+ * @rdm: the #RamDiscardManager
+ * @source: the #RamDiscardSource that is discarding
+ * @offset: offset within the memory region
+ * @size: size of the region being discarded
  */
 void ram_discard_manager_notify_discard(RamDiscardManager *rdm,
+                                        RamDiscardSource *source,
                                         uint64_t offset, uint64_t size);
 
-/*
- * Note: later refactoring should take the source into account and the manager
- *       should be able to aggregate multiple sources.
+/**
+ * ram_discard_manager_notify_discard_all:
+ *
+ * Notify listeners that all regions have been discarded by a source.
+ *
+ * @rdm: the #RamDiscardManager
+ * @source: the #RamDiscardSource that is discarding
  */
-void ram_discard_manager_notify_discard_all(RamDiscardManager *rdm);
+void ram_discard_manager_notify_discard_all(RamDiscardManager *rdm,
+                                            RamDiscardSource *source);
 
-/*
+/**
+ * ram_discard_manager_replay_populated_to_listeners:
+ *
  * Replay populated sections to all registered listeners.
+ * For multi-source aggregation, only replays regions where all sources
+ * are populated (intersection).
  *
- * Note: later refactoring should take the source into account and the manager
- *       should be able to aggregate multiple sources.
+ * @rdm: the #RamDiscardManager
+ *
+ * Returns 0 on success, or a negative error if any notification failed.
  */
 int ram_discard_manager_replay_populated_to_listeners(RamDiscardManager *rdm);
 
diff --git a/hw/virtio/virtio-mem.c b/hw/virtio/virtio-mem.c
index ec165503205..185d04d498c 100644
--- a/hw/virtio/virtio-mem.c
+++ b/hw/virtio/virtio-mem.c
@@ -330,7 +330,8 @@ static void virtio_mem_notify_unplug(VirtIOMEM *vmem, uint64_t offset,
 {
     RamDiscardManager *rdm = memory_region_get_ram_discard_manager(&vmem->memdev->mr);
 
-    ram_discard_manager_notify_discard(rdm, offset, size);
+    ram_discard_manager_notify_discard(rdm, RAM_DISCARD_SOURCE(vmem),
+                                       offset, size);
 }
 
 static int virtio_mem_notify_plug(VirtIOMEM *vmem, uint64_t offset,
@@ -338,7 +339,8 @@ static int virtio_mem_notify_plug(VirtIOMEM *vmem, uint64_t offset,
 {
     RamDiscardManager *rdm = memory_region_get_ram_discard_manager(&vmem->memdev->mr);
 
-    return ram_discard_manager_notify_populate(rdm, offset, size);
+    return ram_discard_manager_notify_populate(rdm, RAM_DISCARD_SOURCE(vmem),
+                                               offset, size);
 }
 
 static void virtio_mem_notify_unplug_all(VirtIOMEM *vmem)
@@ -349,7 +351,7 @@ static void virtio_mem_notify_unplug_all(VirtIOMEM *vmem)
         return;
     }
 
-    ram_discard_manager_notify_discard_all(rdm);
+    ram_discard_manager_notify_discard_all(rdm, RAM_DISCARD_SOURCE(vmem));
 }
 
 static bool virtio_mem_is_range_plugged(const VirtIOMEM *vmem,
diff --git a/system/memory.c b/system/memory.c
index 8b46cb87838..8a4cb7b59ac 100644
--- a/system/memory.c
+++ b/system/memory.c
@@ -2109,21 +2109,22 @@ int memory_region_add_ram_discard_source(MemoryRegion *mr,
                                          RamDiscardSource *source)
 {
     g_assert(memory_region_is_ram(mr));
-    if (mr->rdm) {
-        return -EBUSY;
+
+    if (!mr->rdm) {
+        mr->rdm = ram_discard_manager_new(mr);
     }
 
-    mr->rdm = ram_discard_manager_new(mr, RAM_DISCARD_SOURCE(source));
-    return 0;
+    return ram_discard_manager_add_source(mr->rdm, source);
 }
 
 void memory_region_del_ram_discard_source(MemoryRegion *mr,
                                           RamDiscardSource *source)
 {
-    g_assert(mr->rdm->rds == source);
+    g_assert(mr->rdm);
+
+    ram_discard_manager_del_source(mr->rdm, source);
 
-    object_unref(mr->rdm);
-    mr->rdm = NULL;
+    /* if there is no source and no listener left, we could free rdm */
 }
 
 /* Called with rcu_read_lock held.  */
diff --git a/system/ram-block-attributes.c b/system/ram-block-attributes.c
index e921e09f5b3..e7db0e72e77 100644
--- a/system/ram-block-attributes.c
+++ b/system/ram-block-attributes.c
@@ -218,7 +218,8 @@ ram_block_attributes_notify_discard(RamBlockAttributes *attr,
 {
     RamDiscardManager *rdm = memory_region_get_ram_discard_manager(attr->ram_block->mr);
 
-    ram_discard_manager_notify_discard(rdm, offset, size);
+    ram_discard_manager_notify_discard(rdm, RAM_DISCARD_SOURCE(attr),
+                                       offset, size);
 }
 
 static int
@@ -227,7 +228,8 @@ ram_block_attributes_notify_populate(RamBlockAttributes *attr,
 {
     RamDiscardManager *rdm = memory_region_get_ram_discard_manager(attr->ram_block->mr);
 
-    return ram_discard_manager_notify_populate(rdm, offset, size);
+    return ram_discard_manager_notify_populate(rdm, RAM_DISCARD_SOURCE(attr),
+                                               offset, size);
 }
 
 int ram_block_attributes_state_change(RamBlockAttributes *attr,
diff --git a/system/ram-discard-manager.c b/system/ram-discard-manager.c
index 1c9ff7fda58..c48ac4c66d6 100644
--- a/system/ram-discard-manager.c
+++ b/system/ram-discard-manager.c
@@ -7,6 +7,7 @@
 
 #include "qemu/osdep.h"
 #include "qemu/error-report.h"
+#include "qemu/queue.h"
 #include "system/memory.h"
 
 static uint64_t ram_discard_source_get_min_granularity(const RamDiscardSource *rds,
@@ -49,28 +50,341 @@ static int ram_discard_source_replay_discarded(const RamDiscardSource *rds,
     return rdsc->replay_discarded(rds, section, replay_fn, opaque);
 }
 
-RamDiscardManager *ram_discard_manager_new(MemoryRegion *mr,
-                                           RamDiscardSource *rds)
+RamDiscardManager *ram_discard_manager_new(MemoryRegion *mr)
 {
     RamDiscardManager *rdm;
 
     rdm = RAM_DISCARD_MANAGER(object_new(TYPE_RAM_DISCARD_MANAGER));
-    rdm->rds = rds;
     rdm->mr = mr;
-    QLIST_INIT(&rdm->rdl_list);
     return rdm;
 }
 
+static void ram_discard_manager_update_granularity(RamDiscardManager *rdm)
+{
+    RamDiscardSourceEntry *entry;
+    uint64_t granularity = 0;
+
+    QLIST_FOREACH(entry, &rdm->source_list, next) {
+        uint64_t src_granularity;
+
+        src_granularity = ram_discard_source_get_min_granularity(entry->rds, rdm->mr);
+        g_assert(src_granularity != 0);
+        if (granularity == 0) {
+            granularity = src_granularity;
+        } else {
+            granularity = MIN(granularity, src_granularity);
+        }
+    }
+    rdm->min_granularity = granularity;
+}
+
+static RamDiscardSourceEntry *
+ram_discard_manager_find_source(RamDiscardManager *rdm, RamDiscardSource *rds)
+{
+    RamDiscardSourceEntry *entry;
+
+    QLIST_FOREACH(entry, &rdm->source_list, next) {
+        if (entry->rds == rds) {
+            return entry;
+        }
+    }
+    return NULL;
+}
+
+static int rdl_populate_cb(const MemoryRegionSection *section, void *opaque)
+{
+    RamDiscardListener *rdl = opaque;
+    MemoryRegionSection tmp = *rdl->section;
+
+    g_assert(section->mr == rdl->section->mr);
+
+    if (!memory_region_section_intersect_range(&tmp,
+                                               section->offset_within_region,
+                                               int128_get64(section->size))) {
+        return 0;
+    }
+
+    return rdl->notify_populate(rdl, &tmp);
+}
+
+static int rdl_discard_cb(const MemoryRegionSection *section, void *opaque)
+{
+    RamDiscardListener *rdl = opaque;
+    MemoryRegionSection tmp = *rdl->section;
+
+    g_assert(section->mr == rdl->section->mr);
+
+    if (!memory_region_section_intersect_range(&tmp,
+                                               section->offset_within_region,
+                                               int128_get64(section->size))) {
+        return 0;
+    }
+
+    rdl->notify_discard(rdl, &tmp);
+    return 0;
+}
+
+static bool rdm_is_all_populated_skip(const RamDiscardManager *rdm,
+                                      const MemoryRegionSection *section,
+                                      const RamDiscardSource *skip_source)
+{
+    RamDiscardSourceEntry *entry;
+
+    QLIST_FOREACH(entry, &rdm->source_list, next) {
+        if (skip_source && entry->rds == skip_source) {
+            continue;
+        }
+        if (!ram_discard_source_is_populated(entry->rds, section)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+typedef struct SourceNotifyCtx {
+    RamDiscardManager *rdm;
+    RamDiscardListener *rdl;
+    RamDiscardSource *source; /* added or removed */
+} SourceNotifyCtx;
+
+/*
+ * Unified helper to replay regions based on populated state.
+ * If replay_populated is true: replay regions where ALL sources are populated.
+ * If replay_populated is false: replay regions where ANY source is discarded.
+ */
+static int replay_by_populated_state(const RamDiscardManager *rdm,
+                                     const MemoryRegionSection *section,
+                                     const RamDiscardSource *skip_source,
+                                     bool replay_populated,
+                                     ReplayRamDiscardState replay_fn,
+                                     void *user_opaque)
+{
+    uint64_t granularity = rdm->min_granularity;
+    uint64_t offset, end_offset;
+    uint64_t run_start = 0;
+    bool in_run = false;
+    int ret = 0;
+
+    if (QLIST_EMPTY(&rdm->source_list)) {
+        if (replay_populated) {
+            return replay_fn(section, user_opaque);
+        }
+        return 0;
+    }
+
+    g_assert(granularity != 0);
+
+    offset = section->offset_within_region;
+    end_offset = offset + int128_get64(section->size);
+
+    while (offset < end_offset) {
+        MemoryRegionSection subsection = {
+            .mr = section->mr,
+            .offset_within_region = offset,
+            .size = int128_make64(MIN(granularity, end_offset - offset)),
+        };
+        bool all_populated;
+        bool included;
+
+        all_populated = rdm_is_all_populated_skip(rdm, &subsection, skip_source);
+        included = replay_populated ? all_populated : !all_populated;
+
+        if (included) {
+            if (!in_run) {
+                run_start = offset;
+                in_run = true;
+            }
+        } else {
+            if (in_run) {
+                MemoryRegionSection run_section = {
+                    .mr = section->mr,
+                    .offset_within_region = run_start,
+                    .size = int128_make64(offset - run_start),
+                };
+                ret = replay_fn(&run_section, user_opaque);
+                if (ret) {
+                    return ret;
+                }
+                in_run = false;
+            }
+        }
+        if (granularity > end_offset - offset) {
+            break;
+        }
+        offset += granularity;
+    }
+
+    if (in_run) {
+        MemoryRegionSection run_section = {
+            .mr = section->mr,
+            .offset_within_region = run_start,
+            .size = int128_make64(end_offset - run_start),
+        };
+        ret = replay_fn(&run_section, user_opaque);
+    }
+
+    return ret;
+}
+
+static int add_source_check_discard_cb(const MemoryRegionSection *section,
+                                       void *opaque)
+{
+    SourceNotifyCtx *ctx = opaque;
+
+    return replay_by_populated_state(ctx->rdm, section, ctx->source, true,
+                                     rdl_discard_cb, ctx->rdl);
+}
+
+static int del_source_check_populate_cb(const MemoryRegionSection *section,
+                                        void *opaque)
+{
+    SourceNotifyCtx *ctx = opaque;
+
+    return replay_by_populated_state(ctx->rdm, section, ctx->source, true,
+                                     rdl_populate_cb, ctx->rdl);
+}
+
+int ram_discard_manager_add_source(RamDiscardManager *rdm,
+                                   RamDiscardSource *source)
+{
+    RamDiscardSourceEntry *entry;
+    RamDiscardListener *rdl, *rdl2;
+    int ret = 0;
+
+    if (ram_discard_manager_find_source(rdm, source)) {
+        return -EBUSY;
+    }
+
+    /*
+     * If there are existing listeners, notify them about regions that
+     * become discarded due to adding this source. Only notify for regions
+     * that were previously populated (all other sources agreed).
+     */
+    QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
+        SourceNotifyCtx ctx = {
+            .rdm = rdm,
+            .rdl = rdl,
+            /* no need to set source */
+        };
+        ret = ram_discard_source_replay_discarded(source, rdl->section,
+                                                  add_source_check_discard_cb, &ctx);
+        if (ret) {
+            break;
+        }
+    }
+    if (ret) {
+        QLIST_FOREACH(rdl2, &rdm->rdl_list, next) {
+            SourceNotifyCtx ctx = {
+                .rdm = rdm,
+                .rdl = rdl2,
+            };
+            ram_discard_source_replay_discarded(source, rdl2->section,
+                                                del_source_check_populate_cb, &ctx);
+            if (rdl == rdl2) {
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    entry = g_new0(RamDiscardSourceEntry, 1);
+    entry->rds = source;
+    QLIST_INSERT_HEAD(&rdm->source_list, entry, next);
+
+    ram_discard_manager_update_granularity(rdm);
+
+    return ret;
+}
+
+int ram_discard_manager_del_source(RamDiscardManager *rdm,
+                                   RamDiscardSource *source)
+{
+    RamDiscardSourceEntry *entry;
+    RamDiscardListener *rdl, *rdl2;
+    int ret = 0;
+
+    entry = ram_discard_manager_find_source(rdm, source);
+    if (!entry) {
+        return -ENOENT;
+    }
+
+    /*
+     * If there are existing listeners, check if any regions become
+     * populated due to removing this source.
+     */
+    QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
+        SourceNotifyCtx ctx = {
+            .rdm = rdm,
+            .rdl = rdl,
+            .source = source,
+        };
+        /* from the previously discarded regions, check if any regions become populated */
+        ret = ram_discard_source_replay_discarded(source, rdl->section,
+                                                  del_source_check_populate_cb, &ctx);
+        if (ret) {
+            break;
+        }
+    }
+    if (ret) {
+        QLIST_FOREACH(rdl2, &rdm->rdl_list, next) {
+            SourceNotifyCtx ctx = {
+                .rdm = rdm,
+                .rdl = rdl2,
+                .source = source,
+            };
+            ram_discard_source_replay_discarded(source, rdl2->section,
+                                                add_source_check_discard_cb, &ctx);
+            if (rdl == rdl2) {
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    QLIST_REMOVE(entry, next);
+    g_free(entry);
+    ram_discard_manager_update_granularity(rdm);
+    return ret;
+}
+
 uint64_t ram_discard_manager_get_min_granularity(const RamDiscardManager *rdm,
                                                  const MemoryRegion *mr)
 {
-    return ram_discard_source_get_min_granularity(rdm->rds, mr);
+    g_assert(mr == rdm->mr);
+    return rdm->min_granularity;
 }
 
+/*
+ * Aggregated query: returns true only if ALL sources report populated (AND).
+ */
 bool ram_discard_manager_is_populated(const RamDiscardManager *rdm,
                                       const MemoryRegionSection *section)
 {
-    return ram_discard_source_is_populated(rdm->rds, section);
+    RamDiscardSourceEntry *entry;
+
+    QLIST_FOREACH(entry, &rdm->source_list, next) {
+        if (!ram_discard_source_is_populated(entry->rds, section)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+typedef struct ReplayCtx {
+    const RamDiscardManager *rdm;
+    ReplayRamDiscardState replay_fn;
+    void *user_opaque;
+} ReplayCtx;
+
+static int aggregated_replay_populated_cb(const MemoryRegionSection *section,
+                                          void *opaque)
+{
+    ReplayCtx *ctx = opaque;
+
+    return replay_by_populated_state(ctx->rdm, section, NULL, true,
+                                     ctx->replay_fn, ctx->user_opaque);
 }
 
 int ram_discard_manager_replay_populated(const RamDiscardManager *rdm,
@@ -78,8 +392,21 @@ int ram_discard_manager_replay_populated(const RamDiscardManager *rdm,
                                          ReplayRamDiscardState replay_fn,
                                          void *opaque)
 {
-    return ram_discard_source_replay_populated(rdm->rds, section,
-                                               replay_fn, opaque);
+    RamDiscardSourceEntry *first;
+    ReplayCtx ctx;
+
+    first = QLIST_FIRST(&rdm->source_list);
+    if (!first) {
+        return replay_fn(section, opaque);
+    }
+
+    ctx.rdm = rdm;
+    ctx.replay_fn = replay_fn;
+    ctx.user_opaque = opaque;
+
+    return ram_discard_source_replay_populated(first->rds, section,
+                                               aggregated_replay_populated_cb,
+                                               &ctx);
 }
 
 int ram_discard_manager_replay_discarded(const RamDiscardManager *rdm,
@@ -87,15 +414,21 @@ int ram_discard_manager_replay_discarded(const RamDiscardManager *rdm,
                                          ReplayRamDiscardState replay_fn,
                                          void *opaque)
 {
-    return ram_discard_source_replay_discarded(rdm->rds, section,
-                                               replay_fn, opaque);
+    /* No sources means nothing is discarded (all is considered populated) */
+    if (QLIST_EMPTY(&rdm->source_list)) {
+        return 0;
+    }
+
+    return replay_by_populated_state(rdm, section, NULL, false, replay_fn, opaque);
 }
 
 static void ram_discard_manager_initfn(Object *obj)
 {
     RamDiscardManager *rdm = RAM_DISCARD_MANAGER(obj);
 
+    QLIST_INIT(&rdm->source_list);
     QLIST_INIT(&rdm->rdl_list);
+    rdm->min_granularity = 0;
 }
 
 static void ram_discard_manager_finalize(Object *obj)
@@ -103,74 +436,91 @@ static void ram_discard_manager_finalize(Object *obj)
     RamDiscardManager *rdm = RAM_DISCARD_MANAGER(obj);
 
     g_assert(QLIST_EMPTY(&rdm->rdl_list));
+    g_assert(QLIST_EMPTY(&rdm->source_list));
 }
 
 int ram_discard_manager_notify_populate(RamDiscardManager *rdm,
+                                        RamDiscardSource *source,
                                         uint64_t offset, uint64_t size)
 {
     RamDiscardListener *rdl, *rdl2;
+    MemoryRegionSection section = {
+        .mr = rdm->mr,
+        .offset_within_region = offset,
+        .size = int128_make64(size),
+    };
     int ret = 0;
 
-    QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
-        MemoryRegionSection tmp = *rdl->section;
+    g_assert(ram_discard_manager_find_source(rdm, source));
 
-        if (!memory_region_section_intersect_range(&tmp, offset, size)) {
-            continue;
-        }
-        ret = rdl->notify_populate(rdl, &tmp);
+    /*
+     * Only notify about regions that are populated in ALL sources.
+     * replay_by_populated_state checks all sources including the one that
+     * just populated.
+     */
+    QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
+        ret = replay_by_populated_state(rdm, &section, NULL, true,
+                                        rdl_populate_cb, rdl);
         if (ret) {
             break;
         }
     }
 
     if (ret) {
-        /* Notify all already-notified listeners about discard. */
+        /*
+         * Rollback: notify discard for listeners we already notified,
+         * including the failing listener which may have been partially
+         * notified. Listeners must handle discard notifications for
+         * regions they didn't receive populate notifications for.
+         */
         QLIST_FOREACH(rdl2, &rdm->rdl_list, next) {
-            MemoryRegionSection tmp = *rdl2->section;
-
+            replay_by_populated_state(rdm, &section, NULL, true,
+                                      rdl_discard_cb, rdl2);
             if (rdl2 == rdl) {
                 break;
             }
-            if (!memory_region_section_intersect_range(&tmp, offset, size)) {
-                continue;
-            }
-            rdl2->notify_discard(rdl2, &tmp);
         }
     }
     return ret;
 }
 
 void ram_discard_manager_notify_discard(RamDiscardManager *rdm,
+                                        RamDiscardSource *source,
                                         uint64_t offset, uint64_t size)
 {
     RamDiscardListener *rdl;
-
+    MemoryRegionSection section = {
+        .mr = rdm->mr,
+        .offset_within_region = offset,
+        .size = int128_make64(size),
+    };
+
+    g_assert(ram_discard_manager_find_source(rdm, source));
+
+    /*
+     * Only notify about ranges that were aggregately populated before this
+     * source's discard. Since the source has already updated its state,
+     * we use replay_by_populated_state with this source skipped - it will
+     * replay only the ranges where all OTHER sources are populated.
+     */
     QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
-        MemoryRegionSection tmp = *rdl->section;
-
-        if (!memory_region_section_intersect_range(&tmp, offset, size)) {
-            continue;
-        }
-        rdl->notify_discard(rdl, &tmp);
+        replay_by_populated_state(rdm, &section, source, true,
+                                  rdl_discard_cb, rdl);
     }
 }
 
-void ram_discard_manager_notify_discard_all(RamDiscardManager *rdm)
+void ram_discard_manager_notify_discard_all(RamDiscardManager *rdm,
+                                            RamDiscardSource *source)
 {
     RamDiscardListener *rdl;
 
+    g_assert(ram_discard_manager_find_source(rdm, source));
+
     QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
         rdl->notify_discard(rdl, rdl->section);
     }
 }
 
-static int rdm_populate_cb(const MemoryRegionSection *section, void *opaque)
-{
-    RamDiscardListener *rdl = opaque;
-
-    return rdl->notify_populate(rdl, section);
-}
-
 void ram_discard_manager_register_listener(RamDiscardManager *rdm,
                                            RamDiscardListener *rdl,
                                            MemoryRegionSection *section)
@@ -182,8 +532,8 @@ void ram_discard_manager_register_listener(RamDiscardManager *rdm,
     rdl->section = memory_region_section_new_copy(section);
     QLIST_INSERT_HEAD(&rdm->rdl_list, rdl, next);
 
-    ret = ram_discard_source_replay_populated(rdm->rds, rdl->section,
-                                              rdm_populate_cb, rdl);
+    ret = ram_discard_manager_replay_populated(rdm, rdl->section,
+                                               rdl_populate_cb, rdl);
     if (ret) {
         error_report("%s: Replaying populated ranges failed: %s", __func__,
                      strerror(-ret));
@@ -208,8 +558,8 @@ int ram_discard_manager_replay_populated_to_listeners(RamDiscardManager *rdm)
     int ret = 0;
 
     QLIST_FOREACH(rdl, &rdm->rdl_list, next) {
-        ret = ram_discard_source_replay_populated(rdm->rds, rdl->section,
-                                                  rdm_populate_cb, rdl);
+        ret = ram_discard_manager_replay_populated(rdm, rdl->section,
+                                                   rdl_populate_cb, rdl);
         if (ret) {
             break;
         }
-- 
2.52.0