Adding and removing single objects in a loop is bad in terms of lock
contention and cache line accesses.
To implement batching, record the last object in a batch in the object
itself. This is trivialy possible as hlists are strictly stacks. At a batch
boundary, when the first object is added to the list the object stores a
pointer to itself in debug_obj::batch_last. When the next object is added
to the list then the batch_last pointer is retrieved from the first object
in the list and stored in the to be added one.
That means for batch processing the first object always has a pointer to
the last object in a batch, which allows to move batches in a cache line
efficient way and reduces the lock held time.
Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
---
lib/debugobjects.c | 61 +++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 46 insertions(+), 15 deletions(-)
--- a/lib/debugobjects.c
+++ b/lib/debugobjects.c
@@ -149,18 +149,31 @@ static __always_inline bool pool_must_re
static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
{
- if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
+ struct hlist_node *last, *next_batch, *first_batch;
+ struct debug_obj *obj;
+
+ if (dst->cnt >= dst->max_cnt || !src->cnt)
return false;
- for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
- struct hlist_node *node = src->objects.first;
+ first_batch = src->objects.first;
+ obj = hlist_entry(first_batch, typeof(*obj), node);
+ last = obj->batch_last;
+ next_batch = last->next;
- WRITE_ONCE(src->cnt, src->cnt - 1);
- WRITE_ONCE(dst->cnt, dst->cnt + 1);
+ /* Move the next batch to the front of the source pool */
+ src->objects.first = next_batch;
+ if (next_batch)
+ next_batch->pprev = &src->objects.first;
+
+ /* Add the extracted batch to the destination pool */
+ last->next = dst->objects.first;
+ if (last->next)
+ last->next->pprev = &last->next;
+ first_batch->pprev = &dst->objects.first;
+ dst->objects.first = first_batch;
- hlist_del(node);
- hlist_add_head(node, &dst->objects);
- }
+ WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
+ WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
return true;
}
@@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_p
static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
{
+ struct hlist_node *last, *next;
+ struct debug_obj *obj;
+
if (!src->cnt)
return false;
- for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
- struct hlist_node *node = src->objects.first;
+ /* Move the complete list to the head */
+ hlist_move_list(&src->objects, head);
- WRITE_ONCE(src->cnt, src->cnt - 1);
- hlist_del(node);
- hlist_add_head(node, head);
- }
+ obj = hlist_entry(head->first, typeof(*obj), node);
+ last = obj->batch_last;
+ next = last->next;
+ /* Disconnect the batch from the list */
+ last->next = NULL;
+
+ /* Move the node after last back to the source pool. */
+ src->objects.first = next;
+ if (next)
+ next->pprev = &src->objects.first;
+
+ WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
return true;
}
@@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void
if (!pool_move_batch(pcp, &pool_global))
return NULL;
}
- obj_pool_used += pcp->cnt;
+ obj_pool_used += ODEBUG_BATCH_SIZE;
if (obj_pool_used > obj_pool_max_used)
obj_pool_max_used = obj_pool_used;
@@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void
static void pcpu_free(struct debug_obj *obj)
{
struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
+ struct debug_obj *first;
lockdep_assert_irqs_disabled();
+ if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
+ obj->batch_last = &obj->node;
+ } else {
+ first = hlist_entry(pcp->objects.first, typeof(*first), node);
+ obj->batch_last = first->batch_last;
+ }
hlist_add_head(&obj->node, &pcp->objects);
pcp->cnt++;
On 2024/10/8 0:50, Thomas Gleixner wrote: > Adding and removing single objects in a loop is bad in terms of lock > contention and cache line accesses. > > To implement batching, record the last object in a batch in the object > itself. This is trivialy possible as hlists are strictly stacks. At a batch > boundary, when the first object is added to the list the object stores a > pointer to itself in debug_obj::batch_last. When the next object is added > to the list then the batch_last pointer is retrieved from the first object > in the list and stored in the to be added one. > > That means for batch processing the first object always has a pointer to > the last object in a batch, which allows to move batches in a cache line > efficient way and reduces the lock held time. It seems that adding a helper function hlist_cut_position() can make the code look more concise and clear. But there's a lot of patches now. We can do it later, and maybe I can do it then. Similar to the current list_cut_position(): /** * list_cut_position - cut a list into two * @list: a new list to add all removed entries * @head: a list with entries * @entry: an entry within head Reviewed-by: Zhen Lei <thunder.leizhen@huawei.com> > > Signed-off-by: Thomas Gleixner <tglx@linutronix.de> > --- > lib/debugobjects.c | 61 +++++++++++++++++++++++++++++++++++++++-------------- > 1 file changed, 46 insertions(+), 15 deletions(-) > > --- a/lib/debugobjects.c > +++ b/lib/debugobjects.c > @@ -149,18 +149,31 @@ static __always_inline bool pool_must_re > > static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src) > { > - if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt) > + struct hlist_node *last, *next_batch, *first_batch; > + struct debug_obj *obj; > + > + if (dst->cnt >= dst->max_cnt || !src->cnt) > return false; > > - for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) { > - struct hlist_node *node = src->objects.first; > + first_batch = src->objects.first; > + obj = hlist_entry(first_batch, typeof(*obj), node); > + last = obj->batch_last; > + next_batch = last->next; > > - WRITE_ONCE(src->cnt, src->cnt - 1); > - WRITE_ONCE(dst->cnt, dst->cnt + 1); > + /* Move the next batch to the front of the source pool */ > + src->objects.first = next_batch; > + if (next_batch) > + next_batch->pprev = &src->objects.first; > + > + /* Add the extracted batch to the destination pool */ > + last->next = dst->objects.first; > + if (last->next) > + last->next->pprev = &last->next; > + first_batch->pprev = &dst->objects.first; > + dst->objects.first = first_batch; > > - hlist_del(node); > - hlist_add_head(node, &dst->objects); > - } > + WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE); > + WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE); > return true; > } > > @@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_p > > static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src) > { > + struct hlist_node *last, *next; > + struct debug_obj *obj; > + > if (!src->cnt) > return false; > > - for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) { > - struct hlist_node *node = src->objects.first; > + /* Move the complete list to the head */ > + hlist_move_list(&src->objects, head); > > - WRITE_ONCE(src->cnt, src->cnt - 1); > - hlist_del(node); > - hlist_add_head(node, head); > - } > + obj = hlist_entry(head->first, typeof(*obj), node); > + last = obj->batch_last; > + next = last->next; > + /* Disconnect the batch from the list */ > + last->next = NULL; > + > + /* Move the node after last back to the source pool. */ > + src->objects.first = next; > + if (next) > + next->pprev = &src->objects.first; > + > + WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE); > return true; > } > > @@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void > if (!pool_move_batch(pcp, &pool_global)) > return NULL; > } > - obj_pool_used += pcp->cnt; > + obj_pool_used += ODEBUG_BATCH_SIZE; > > if (obj_pool_used > obj_pool_max_used) > obj_pool_max_used = obj_pool_used; > @@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void > static void pcpu_free(struct debug_obj *obj) > { > struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu); > + struct debug_obj *first; > > lockdep_assert_irqs_disabled(); > > + if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) { > + obj->batch_last = &obj->node; > + } else { > + first = hlist_entry(pcp->objects.first, typeof(*first), node); > + obj->batch_last = first->batch_last; > + } > hlist_add_head(&obj->node, &pcp->objects); > pcp->cnt++; > > > . > -- Regards, Zhen Lei
On Thu, Oct 10 2024 at 17:39, Leizhen wrote: > On 2024/10/8 0:50, Thomas Gleixner wrote: >> Adding and removing single objects in a loop is bad in terms of lock >> contention and cache line accesses. >> >> To implement batching, record the last object in a batch in the object >> itself. This is trivialy possible as hlists are strictly stacks. At a batch >> boundary, when the first object is added to the list the object stores a >> pointer to itself in debug_obj::batch_last. When the next object is added >> to the list then the batch_last pointer is retrieved from the first object >> in the list and stored in the to be added one. >> >> That means for batch processing the first object always has a pointer to >> the last object in a batch, which allows to move batches in a cache line >> efficient way and reduces the lock held time. > > It seems that adding a helper function hlist_cut_position() can make the code > look more concise and clear. But there's a lot of patches now. We can do it > later, and maybe I can do it then. > > Similar to the current list_cut_position(): Yes. Thought about that, but then ran out of cycles. Feel free to look at that. Help is welcome. Thanks, tglx
The following commit has been merged into the core/debugobjects branch of tip:
Commit-ID: f57ebb92ba3e09a7e1082f147d6e1456d702d4b2
Gitweb: https://git.kernel.org/tip/f57ebb92ba3e09a7e1082f147d6e1456d702d4b2
Author: Thomas Gleixner <tglx@linutronix.de>
AuthorDate: Mon, 07 Oct 2024 18:50:17 +02:00
Committer: Thomas Gleixner <tglx@linutronix.de>
CommitterDate: Tue, 15 Oct 2024 17:30:33 +02:00
debugobjects: Implement batch processing
Adding and removing single objects in a loop is bad in terms of lock
contention and cache line accesses.
To implement batching, record the last object in a batch in the object
itself. This is trivialy possible as hlists are strictly stacks. At a batch
boundary, when the first object is added to the list the object stores a
pointer to itself in debug_obj::batch_last. When the next object is added
to the list then the batch_last pointer is retrieved from the first object
in the list and stored in the to be added one.
That means for batch processing the first object always has a pointer to
the last object in a batch, which allows to move batches in a cache line
efficient way and reduces the lock held time.
Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
Reviewed-by: Zhen Lei <thunder.leizhen@huawei.com>
Link: https://lore.kernel.org/all/20241007164914.258995000@linutronix.de
---
lib/debugobjects.c | 61 +++++++++++++++++++++++++++++++++------------
1 file changed, 46 insertions(+), 15 deletions(-)
diff --git a/lib/debugobjects.c b/lib/debugobjects.c
index cdd5d23..4e80c31 100644
--- a/lib/debugobjects.c
+++ b/lib/debugobjects.c
@@ -149,18 +149,31 @@ static __always_inline bool pool_must_refill(struct obj_pool *pool)
static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
{
- if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
+ struct hlist_node *last, *next_batch, *first_batch;
+ struct debug_obj *obj;
+
+ if (dst->cnt >= dst->max_cnt || !src->cnt)
return false;
- for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
- struct hlist_node *node = src->objects.first;
+ first_batch = src->objects.first;
+ obj = hlist_entry(first_batch, typeof(*obj), node);
+ last = obj->batch_last;
+ next_batch = last->next;
- WRITE_ONCE(src->cnt, src->cnt - 1);
- WRITE_ONCE(dst->cnt, dst->cnt + 1);
+ /* Move the next batch to the front of the source pool */
+ src->objects.first = next_batch;
+ if (next_batch)
+ next_batch->pprev = &src->objects.first;
- hlist_del(node);
- hlist_add_head(node, &dst->objects);
- }
+ /* Add the extracted batch to the destination pool */
+ last->next = dst->objects.first;
+ if (last->next)
+ last->next->pprev = &last->next;
+ first_batch->pprev = &dst->objects.first;
+ dst->objects.first = first_batch;
+
+ WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
+ WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
return true;
}
@@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_pool *dst, struct hlist_head *head)
static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
{
+ struct hlist_node *last, *next;
+ struct debug_obj *obj;
+
if (!src->cnt)
return false;
- for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
- struct hlist_node *node = src->objects.first;
+ /* Move the complete list to the head */
+ hlist_move_list(&src->objects, head);
- WRITE_ONCE(src->cnt, src->cnt - 1);
- hlist_del(node);
- hlist_add_head(node, head);
- }
+ obj = hlist_entry(head->first, typeof(*obj), node);
+ last = obj->batch_last;
+ next = last->next;
+ /* Disconnect the batch from the list */
+ last->next = NULL;
+
+ /* Move the node after last back to the source pool. */
+ src->objects.first = next;
+ if (next)
+ next->pprev = &src->objects.first;
+
+ WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
return true;
}
@@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void)
if (!pool_move_batch(pcp, &pool_global))
return NULL;
}
- obj_pool_used += pcp->cnt;
+ obj_pool_used += ODEBUG_BATCH_SIZE;
if (obj_pool_used > obj_pool_max_used)
obj_pool_max_used = obj_pool_used;
@@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void)
static void pcpu_free(struct debug_obj *obj)
{
struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
+ struct debug_obj *first;
lockdep_assert_irqs_disabled();
+ if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
+ obj->batch_last = &obj->node;
+ } else {
+ first = hlist_entry(pcp->objects.first, typeof(*first), node);
+ obj->batch_last = first->batch_last;
+ }
hlist_add_head(&obj->node, &pcp->objects);
pcp->cnt++;
© 2016 - 2024 Red Hat, Inc.