[patch 21/25] debugobjects: Implement batch processing

Thomas Gleixner posted 25 patches 1 month, 3 weeks ago
[patch 21/25] debugobjects: Implement batch processing
Posted by Thomas Gleixner 1 month, 3 weeks ago
Adding and removing single objects in a loop is bad in terms of lock
contention and cache line accesses.

To implement batching, record the last object in a batch in the object
itself. This is trivialy possible as hlists are strictly stacks. At a batch
boundary, when the first object is added to the list the object stores a
pointer to itself in debug_obj::batch_last. When the next object is added
to the list then the batch_last pointer is retrieved from the first object
in the list and stored in the to be added one.

That means for batch processing the first object always has a pointer to
the last object in a batch, which allows to move batches in a cache line
efficient way and reduces the lock held time.

Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
---
 lib/debugobjects.c |   61 +++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 46 insertions(+), 15 deletions(-)

--- a/lib/debugobjects.c
+++ b/lib/debugobjects.c
@@ -149,18 +149,31 @@ static __always_inline bool pool_must_re
 
 static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
 {
-	if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
+	struct hlist_node *last, *next_batch, *first_batch;
+	struct debug_obj *obj;
+
+	if (dst->cnt >= dst->max_cnt || !src->cnt)
 		return false;
 
-	for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
-		struct hlist_node *node = src->objects.first;
+	first_batch = src->objects.first;
+	obj = hlist_entry(first_batch, typeof(*obj), node);
+	last = obj->batch_last;
+	next_batch = last->next;
 
-		WRITE_ONCE(src->cnt, src->cnt - 1);
-		WRITE_ONCE(dst->cnt, dst->cnt + 1);
+	/* Move the next batch to the front of the source pool */
+	src->objects.first = next_batch;
+	if (next_batch)
+		next_batch->pprev = &src->objects.first;
+
+	/* Add the extracted batch to the destination pool */
+	last->next = dst->objects.first;
+	if (last->next)
+		last->next->pprev = &last->next;
+	first_batch->pprev = &dst->objects.first;
+	dst->objects.first = first_batch;
 
-		hlist_del(node);
-		hlist_add_head(node, &dst->objects);
-	}
+	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
+	WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
 	return true;
 }
 
@@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_p
 
 static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
 {
+	struct hlist_node *last, *next;
+	struct debug_obj *obj;
+
 	if (!src->cnt)
 		return false;
 
-	for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
-		struct hlist_node *node = src->objects.first;
+	/* Move the complete list to the head */
+	hlist_move_list(&src->objects, head);
 
-		WRITE_ONCE(src->cnt, src->cnt - 1);
-		hlist_del(node);
-		hlist_add_head(node, head);
-	}
+	obj = hlist_entry(head->first, typeof(*obj), node);
+	last = obj->batch_last;
+	next = last->next;
+	/* Disconnect the batch from the list */
+	last->next = NULL;
+
+	/* Move the node after last back to the source pool. */
+	src->objects.first = next;
+	if (next)
+		next->pprev = &src->objects.first;
+
+	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
 	return true;
 }
 
@@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void
 			if (!pool_move_batch(pcp, &pool_global))
 				return NULL;
 		}
-		obj_pool_used += pcp->cnt;
+		obj_pool_used += ODEBUG_BATCH_SIZE;
 
 		if (obj_pool_used > obj_pool_max_used)
 			obj_pool_max_used = obj_pool_used;
@@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void
 static void pcpu_free(struct debug_obj *obj)
 {
 	struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
+	struct debug_obj *first;
 
 	lockdep_assert_irqs_disabled();
 
+	if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
+		obj->batch_last = &obj->node;
+	} else {
+		first = hlist_entry(pcp->objects.first, typeof(*first), node);
+		obj->batch_last = first->batch_last;
+	}
 	hlist_add_head(&obj->node, &pcp->objects);
 	pcp->cnt++;
Re: [patch 21/25] debugobjects: Implement batch processing
Posted by Leizhen (ThunderTown) 1 month, 2 weeks ago

On 2024/10/8 0:50, Thomas Gleixner wrote:
> Adding and removing single objects in a loop is bad in terms of lock
> contention and cache line accesses.
> 
> To implement batching, record the last object in a batch in the object
> itself. This is trivialy possible as hlists are strictly stacks. At a batch
> boundary, when the first object is added to the list the object stores a
> pointer to itself in debug_obj::batch_last. When the next object is added
> to the list then the batch_last pointer is retrieved from the first object
> in the list and stored in the to be added one.
> 
> That means for batch processing the first object always has a pointer to
> the last object in a batch, which allows to move batches in a cache line
> efficient way and reduces the lock held time.

It seems that adding a helper function hlist_cut_position() can make the code
look more concise and clear. But there's a lot of patches now. We can do it
later, and maybe I can do it then.

Similar to the current list_cut_position():
/**
 * list_cut_position - cut a list into two
 * @list: a new list to add all removed entries
 * @head: a list with entries
 * @entry: an entry within head


Reviewed-by: Zhen Lei <thunder.leizhen@huawei.com>

> 
> Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
> ---
>  lib/debugobjects.c |   61 +++++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 46 insertions(+), 15 deletions(-)
> 
> --- a/lib/debugobjects.c
> +++ b/lib/debugobjects.c
> @@ -149,18 +149,31 @@ static __always_inline bool pool_must_re
>  
>  static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
>  {
> -	if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
> +	struct hlist_node *last, *next_batch, *first_batch;
> +	struct debug_obj *obj;
> +
> +	if (dst->cnt >= dst->max_cnt || !src->cnt)
>  		return false;
>  
> -	for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
> -		struct hlist_node *node = src->objects.first;
> +	first_batch = src->objects.first;
> +	obj = hlist_entry(first_batch, typeof(*obj), node);
> +	last = obj->batch_last;
> +	next_batch = last->next;
>  
> -		WRITE_ONCE(src->cnt, src->cnt - 1);
> -		WRITE_ONCE(dst->cnt, dst->cnt + 1);
> +	/* Move the next batch to the front of the source pool */
> +	src->objects.first = next_batch;
> +	if (next_batch)
> +		next_batch->pprev = &src->objects.first;
> +
> +	/* Add the extracted batch to the destination pool */
> +	last->next = dst->objects.first;
> +	if (last->next)
> +		last->next->pprev = &last->next;
> +	first_batch->pprev = &dst->objects.first;
> +	dst->objects.first = first_batch;
>  
> -		hlist_del(node);
> -		hlist_add_head(node, &dst->objects);
> -	}
> +	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
> +	WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
>  	return true;
>  }
>  
> @@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_p
>  
>  static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
>  {
> +	struct hlist_node *last, *next;
> +	struct debug_obj *obj;
> +
>  	if (!src->cnt)
>  		return false;
>  
> -	for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
> -		struct hlist_node *node = src->objects.first;
> +	/* Move the complete list to the head */
> +	hlist_move_list(&src->objects, head);
>  
> -		WRITE_ONCE(src->cnt, src->cnt - 1);
> -		hlist_del(node);
> -		hlist_add_head(node, head);
> -	}
> +	obj = hlist_entry(head->first, typeof(*obj), node);
> +	last = obj->batch_last;
> +	next = last->next;
> +	/* Disconnect the batch from the list */
> +	last->next = NULL;
> +
> +	/* Move the node after last back to the source pool. */
> +	src->objects.first = next;
> +	if (next)
> +		next->pprev = &src->objects.first;
> +
> +	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
>  	return true;
>  }
>  
> @@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void
>  			if (!pool_move_batch(pcp, &pool_global))
>  				return NULL;
>  		}
> -		obj_pool_used += pcp->cnt;
> +		obj_pool_used += ODEBUG_BATCH_SIZE;
>  
>  		if (obj_pool_used > obj_pool_max_used)
>  			obj_pool_max_used = obj_pool_used;
> @@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void
>  static void pcpu_free(struct debug_obj *obj)
>  {
>  	struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
> +	struct debug_obj *first;
>  
>  	lockdep_assert_irqs_disabled();
>  
> +	if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
> +		obj->batch_last = &obj->node;
> +	} else {
> +		first = hlist_entry(pcp->objects.first, typeof(*first), node);
> +		obj->batch_last = first->batch_last;
> +	}
>  	hlist_add_head(&obj->node, &pcp->objects);
>  	pcp->cnt++;
>  
> 
> .
> 

-- 
Regards,
  Zhen Lei
Re: [patch 21/25] debugobjects: Implement batch processing
Posted by Thomas Gleixner 1 month, 2 weeks ago
On Thu, Oct 10 2024 at 17:39, Leizhen wrote:
> On 2024/10/8 0:50, Thomas Gleixner wrote:
>> Adding and removing single objects in a loop is bad in terms of lock
>> contention and cache line accesses.
>> 
>> To implement batching, record the last object in a batch in the object
>> itself. This is trivialy possible as hlists are strictly stacks. At a batch
>> boundary, when the first object is added to the list the object stores a
>> pointer to itself in debug_obj::batch_last. When the next object is added
>> to the list then the batch_last pointer is retrieved from the first object
>> in the list and stored in the to be added one.
>> 
>> That means for batch processing the first object always has a pointer to
>> the last object in a batch, which allows to move batches in a cache line
>> efficient way and reduces the lock held time.
>
> It seems that adding a helper function hlist_cut_position() can make the code
> look more concise and clear. But there's a lot of patches now. We can do it
> later, and maybe I can do it then.
>
> Similar to the current list_cut_position():

Yes. Thought about that, but then ran out of cycles. Feel free to look
at that. Help is welcome.

Thanks,

        tglx
[tip: core/debugobjects] debugobjects: Implement batch processing
Posted by tip-bot2 for Thomas Gleixner 1 month, 1 week ago
The following commit has been merged into the core/debugobjects branch of tip:

Commit-ID:     f57ebb92ba3e09a7e1082f147d6e1456d702d4b2
Gitweb:        https://git.kernel.org/tip/f57ebb92ba3e09a7e1082f147d6e1456d702d4b2
Author:        Thomas Gleixner <tglx@linutronix.de>
AuthorDate:    Mon, 07 Oct 2024 18:50:17 +02:00
Committer:     Thomas Gleixner <tglx@linutronix.de>
CommitterDate: Tue, 15 Oct 2024 17:30:33 +02:00

debugobjects: Implement batch processing

Adding and removing single objects in a loop is bad in terms of lock
contention and cache line accesses.

To implement batching, record the last object in a batch in the object
itself. This is trivialy possible as hlists are strictly stacks. At a batch
boundary, when the first object is added to the list the object stores a
pointer to itself in debug_obj::batch_last. When the next object is added
to the list then the batch_last pointer is retrieved from the first object
in the list and stored in the to be added one.

That means for batch processing the first object always has a pointer to
the last object in a batch, which allows to move batches in a cache line
efficient way and reduces the lock held time.

Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
Reviewed-by: Zhen Lei <thunder.leizhen@huawei.com>
Link: https://lore.kernel.org/all/20241007164914.258995000@linutronix.de

---
 lib/debugobjects.c | 61 +++++++++++++++++++++++++++++++++------------
 1 file changed, 46 insertions(+), 15 deletions(-)

diff --git a/lib/debugobjects.c b/lib/debugobjects.c
index cdd5d23..4e80c31 100644
--- a/lib/debugobjects.c
+++ b/lib/debugobjects.c
@@ -149,18 +149,31 @@ static __always_inline bool pool_must_refill(struct obj_pool *pool)
 
 static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
 {
-	if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
+	struct hlist_node *last, *next_batch, *first_batch;
+	struct debug_obj *obj;
+
+	if (dst->cnt >= dst->max_cnt || !src->cnt)
 		return false;
 
-	for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
-		struct hlist_node *node = src->objects.first;
+	first_batch = src->objects.first;
+	obj = hlist_entry(first_batch, typeof(*obj), node);
+	last = obj->batch_last;
+	next_batch = last->next;
 
-		WRITE_ONCE(src->cnt, src->cnt - 1);
-		WRITE_ONCE(dst->cnt, dst->cnt + 1);
+	/* Move the next batch to the front of the source pool */
+	src->objects.first = next_batch;
+	if (next_batch)
+		next_batch->pprev = &src->objects.first;
 
-		hlist_del(node);
-		hlist_add_head(node, &dst->objects);
-	}
+	/* Add the extracted batch to the destination pool */
+	last->next = dst->objects.first;
+	if (last->next)
+		last->next->pprev = &last->next;
+	first_batch->pprev = &dst->objects.first;
+	dst->objects.first = first_batch;
+
+	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
+	WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
 	return true;
 }
 
@@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_pool *dst, struct hlist_head *head)
 
 static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
 {
+	struct hlist_node *last, *next;
+	struct debug_obj *obj;
+
 	if (!src->cnt)
 		return false;
 
-	for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
-		struct hlist_node *node = src->objects.first;
+	/* Move the complete list to the head */
+	hlist_move_list(&src->objects, head);
 
-		WRITE_ONCE(src->cnt, src->cnt - 1);
-		hlist_del(node);
-		hlist_add_head(node, head);
-	}
+	obj = hlist_entry(head->first, typeof(*obj), node);
+	last = obj->batch_last;
+	next = last->next;
+	/* Disconnect the batch from the list */
+	last->next = NULL;
+
+	/* Move the node after last back to the source pool. */
+	src->objects.first = next;
+	if (next)
+		next->pprev = &src->objects.first;
+
+	WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
 	return true;
 }
 
@@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void)
 			if (!pool_move_batch(pcp, &pool_global))
 				return NULL;
 		}
-		obj_pool_used += pcp->cnt;
+		obj_pool_used += ODEBUG_BATCH_SIZE;
 
 		if (obj_pool_used > obj_pool_max_used)
 			obj_pool_max_used = obj_pool_used;
@@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void)
 static void pcpu_free(struct debug_obj *obj)
 {
 	struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
+	struct debug_obj *first;
 
 	lockdep_assert_irqs_disabled();
 
+	if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
+		obj->batch_last = &obj->node;
+	} else {
+		first = hlist_entry(pcp->objects.first, typeof(*first), node);
+		obj->batch_last = first->batch_last;
+	}
 	hlist_add_head(&obj->node, &pcp->objects);
 	pcp->cnt++;