[PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event

Marc Hartmayer posted 20 patches 1 year, 9 months ago
There is a newer version of this series
[PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Marc Hartmayer 1 year, 9 months ago
Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
and the initialization instead of a separate initThread and a mdevctl-thread.
This has the large advantage that we can leverage the job API and now this
thread pool is responsible to do all the "costly-work" and emitting the libvirt
nodedev events.

Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
---
 src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
 1 file changed, 179 insertions(+), 65 deletions(-)

diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
index e4b1532dc385..67a8b5cd7132 100644
--- a/src/node_device/node_device_udev.c
+++ b/src/node_device/node_device_udev.c
@@ -43,6 +43,7 @@
 #include "virnetdev.h"
 #include "virmdev.h"
 #include "virutil.h"
+#include "virthreadpool.h"
 
 #include "configmake.h"
 
@@ -69,13 +70,13 @@ struct _udevEventData {
     bool udevThreadQuit;
     bool udevDataReady;
 
-    /* init thread */
-    virThread *initThread;
-
     /* Protects @mdevctlMonitors */
     virMutex mdevctlLock;
     GList *mdevctlMonitors;
     int mdevctlTimeout;
+
+    /* Immutable pointer, self-locking APIs */
+    virThreadPool *workerPool;
 };
 
 static virClass *udevEventDataClass;
@@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
     struct udev *udev = NULL;
     udevEventData *priv = obj;
 
-    g_clear_pointer(&priv->initThread, g_free);
-
     VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
         g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref);
     }
@@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
         udev_unref(udev);
     }
 
+    g_clear_pointer(&priv->workerPool, virThreadPoolFree);
+
     virMutexDestroy(&priv->mdevctlLock);
 
     virCondDestroy(&priv->udevThreadCond);
@@ -143,6 +144,66 @@ udevEventDataNew(void)
     return ret;
 }
 
+typedef enum {
+  NODE_DEVICE_EVENT_INIT = 0,
+  NODE_DEVICE_EVENT_UDEV_ADD,
+  NODE_DEVICE_EVENT_UDEV_REMOVE,
+  NODE_DEVICE_EVENT_UDEV_CHANGE,
+  NODE_DEVICE_EVENT_UDEV_MOVE,
+  NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
+
+  NODE_DEVICE_EVENT_LAST
+} nodeDeviceEventType;
+
+struct _nodeDeviceEvent {
+    nodeDeviceEventType eventType;
+    void *data;
+    virFreeCallback dataFreeFunc;
+};
+typedef struct _nodeDeviceEvent nodeDeviceEvent;
+
+static void
+nodeDeviceEventFree(nodeDeviceEvent *event)
+{
+    if (!event)
+        return;
+
+    if (event->dataFreeFunc)
+        event->dataFreeFunc(event->data);
+    g_free(event);
+}
+G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
+
+ /**
+  * nodeDeviceEventSubmit:
+  * @eventType: the event to be processed
+  * @data: additional data for the event processor (the pointer is stolen and it
+  *        will be properly freed using @dataFreeFunc)
+  * @dataFreeFunc: callback to free @data
+  *
+  * Submits @eventType to be processed by the asynchronous event handling
+  * thread.
+  */
+static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc)
+{
+    nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
+    udevEventData *priv = NULL;
+
+    if (!driver)
+        return -1;
+
+    priv = driver->privateData;
+
+    event->eventType = eventType;
+    event->data = data;
+    event->dataFreeFunc = dataFreeFunc;
+    if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
+        nodeDeviceEventFree(event);
+        return -1;
+    }
+    return 0;
+}
+
 
 static bool
 udevHasDeviceProperty(struct udev_device *dev,
@@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force);
 
 
 static int
-udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path)
+processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path)
 {
     virNodeDeviceObj *obj = NULL;
     virNodeDeviceDef *def;
@@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device
 }
 
 static int
-udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device)
+processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device)
 {
     g_autofree char *sysfs_path = NULL;
     virNodeDeviceDef *def = NULL;
@@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev *
     device = udev_device_new_from_syspath(udev, name);
 
     if (device != NULL) {
-        if (udevAddOneDevice(driver_state, device) != 0) {
+        if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
             VIR_DEBUG("Failed to create node device for udev device '%s'",
                       name);
         }
@@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device)
 
     VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device));
 
-    if (STREQ(action, "add") || STREQ(action, "change"))
-        return udevAddOneDevice(driver, device);
-
-    if (STREQ(action, "remove")) {
-        const char *path = udev_device_get_syspath(device);
-
-        return udevRemoveOneDeviceSysPath(driver, path);
-    }
-
-    if (STREQ(action, "move")) {
-        const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
-
-        if (devpath_old) {
-            g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
-
-            udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
-        }
-
-        return udevAddOneDevice(driver, device);
+    /* Reference is either released via workerpool logic or at the end of this
+     * function. */
+    device = udev_device_ref(device);
+    if (STREQ(action, "add")) {
+        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
+                                     (virFreeCallback)udev_device_unref);
+    } else if (STREQ(action, "change")) {
+        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
+                                     (virFreeCallback)udev_device_unref);
+    } else if (STREQ(action, "remove")) {
+        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
+                                     (virFreeCallback)udev_device_unref);
+    } else if (STREQ(action, "move")) {
+        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
+                                     (virFreeCallback)udev_device_unref);
     }
+    udev_device_unref(device);
 
     return 0;
 }
@@ -1993,23 +2051,23 @@ udevSetupSystemDev(void)
 
 
 static void
-nodeStateInitializeEnumerate(void *opaque)
+processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void *opaque)
 {
-    udevEventData *priv = driver->privateData;
+    udevEventData *priv = driver_state->privateData;
     struct udev *udev = opaque;
 
     /* Populate with known devices */
-    if (udevEnumerateDevices(driver, udev) != 0)
+    if (udevEnumerateDevices(driver_state, udev) != 0)
         goto error;
     /* Load persistent mdevs (which might not be activated yet) and additional
      * information about active mediated devices from mdevctl */
-    if (nodeDeviceUpdateMediatedDevices(driver) != 0)
+    if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
         goto error;
 
  cleanup:
-    VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) {
-        driver->initialized = true;
-        virCondBroadcast(&driver->initCond);
+    VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
+        driver_state->initialized = true;
+        virCondBroadcast(&driver_state->initCond);
     }
 
     return;
@@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
 
 
 static void
-mdevctlUpdateThreadFunc(void *opaque)
-{
-    virNodeDeviceDriverState *driver_state = opaque;
-
-    if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
-        VIR_WARN("mdevctl failed to update mediated devices");
-}
-
-
-static void
-launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
+submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
 {
     udevEventData *priv = opaque;
-    virThread thread;
 
     if (priv->mdevctlTimeout != -1) {
         virEventRemoveTimeout(priv->mdevctlTimeout);
         priv->mdevctlTimeout = -1;
     }
 
-    if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc,
-                            "mdevctl-thread", false, driver) < 0) {
-        virReportSystemError(errno, "%s",
-                             _("failed to create mdevctl thread"));
-    }
+    nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,  NULL, NULL);
 }
 
 
@@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv)
 /* Schedules an mdevctl update for 100ms in the future, canceling any existing
  * timeout that may have been set. In this way, multiple update requests in
  * quick succession can be collapsed into a single update. if @force is true,
- * an update thread will be spawned immediately. */
+ * the worker job is submitted immediately. */
 static void
 scheduleMdevctlUpdate(udevEventData *data,
                       bool force)
@@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data,
     if (!force) {
         if (data->mdevctlTimeout != -1)
             virEventRemoveTimeout(data->mdevctlTimeout);
-        data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread,
+        data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
                                                   data, NULL);
         return;
     }
 
-    launchMdevctlUpdateThread(-1, data);
+    submitMdevctlUpdate(-1, data);
 }
 
 
@@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
 }
 
 
+static void nodeDeviceEventHandler(void *data, void *opaque)
+{
+    virNodeDeviceDriverState *driver_state = opaque;
+    g_autoptr(nodeDeviceEvent) processEvent = data;
+
+    switch (processEvent->eventType) {
+    case NODE_DEVICE_EVENT_INIT:
+    {
+        struct udev *udev = processEvent->data;
+
+        processNodeStateInitializeEnumerate(driver_state, udev);
+    }
+    break;
+    case NODE_DEVICE_EVENT_UDEV_ADD:
+    case NODE_DEVICE_EVENT_UDEV_CHANGE:
+    {
+        struct udev_device *device = processEvent->data;
+
+        processNodeDeviceAddAndChangeEvent(driver_state, device);
+    }
+    break;
+    case NODE_DEVICE_EVENT_UDEV_REMOVE:
+    {
+        struct udev_device *device = processEvent->data;
+        const char *path = udev_device_get_syspath(device);
+
+        processNodeDeviceRemoveEvent(driver_state, path);
+    }
+    break;
+    case NODE_DEVICE_EVENT_UDEV_MOVE:
+    {
+        struct udev_device *device = processEvent->data;
+        const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
+
+        if (devpath_old) {
+            g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
+
+            processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
+        }
+
+        processNodeDeviceAddAndChangeEvent(driver_state, device);
+    }
+    break;
+    case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
+    {
+        if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
+            VIR_WARN("mdevctl failed to update mediated devices");
+    }
+    break;
+    case NODE_DEVICE_EVENT_LAST:
+        g_assert_not_reached();
+        break;
+    }
+}
+
+
 /* Note: It must be safe to call this function even if the driver was not
  *       successfully initialized. This must be considered when changing this
  *       function. */
@@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void)
         priv->udevThreadQuit = true;
         virCondSignal(&priv->udevThreadCond);
     }
+
+    if (priv->workerPool)
+        virThreadPoolStop(priv->workerPool);
     return 0;
 }
 
@@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
         return 0;
 
     VIR_WITH_OBJECT_LOCK_GUARD(priv) {
-        if (priv->initThread)
-            virThreadJoin(priv->initThread);
-        if (priv->udevThread)
-            virThreadJoin(priv->udevThread);
+        if (priv->mdevctlTimeout != -1) {
+            virEventRemoveTimeout(priv->mdevctlTimeout);
+            priv->mdevctlTimeout = -1;
+        }
+
+        if (priv->watch) {
+            virEventRemoveHandle(priv->watch);
+            priv->watch = -1;
+        }
     }
+
+    if (priv->workerPool)
+        virThreadPoolDrain(priv->workerPool);
     return 0;
 }
 
@@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged,
     driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
     driver->parserCallbacks.validate = nodeDeviceDefValidate;
 
+    /* must be initialized before trying to reconnect to all the running mdevs
+     * since there might occur some mdevctl monitor events that will be
+     * dispatched to the worker pool */
+    priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler,
+                                            "nodev-device-event",
+                                            NULL,
+                                            driver);
+    if (!priv->workerPool)
+        goto unlock;
+
     if (udevPCITranslateInit(privileged) < 0)
         goto unlock;
 
@@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged,
     if (udevSetupSystemDev() != 0)
         goto cleanup;
 
-    priv->initThread = g_new0(virThread, 1);
-    if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
-                            "nodedev-init", false, udev) < 0) {
-        virReportSystemError(errno, "%s",
-                             _("failed to create udev enumerate thread"));
-        goto cleanup;
-    }
+    nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref);
 
     return VIR_DRV_STATE_INIT_COMPLETE;
 
-- 
2.34.1
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Marc Hartmayer 1 year, 9 months ago
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---

[…snip…]

>  
>  
> +static void nodeDeviceEventHandler(void *data, void *opaque)
> +{
> +    virNodeDeviceDriverState *driver_state = opaque;
> +    g_autoptr(nodeDeviceEvent) processEvent = data;
> +
> +    switch (processEvent->eventType) {
> +    case NODE_DEVICE_EVENT_INIT:
> +    {
> +        struct udev *udev = processEvent->data;
> +
> +        processNodeStateInitializeEnumerate(driver_state, udev);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_ADD:
> +    case NODE_DEVICE_EVENT_UDEV_CHANGE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +
> +        processNodeDeviceAddAndChangeEvent(driver_state, device);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_REMOVE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +        const char *path = udev_device_get_syspath(device);
> +
> +        processNodeDeviceRemoveEvent(driver_state, path);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_MOVE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +        const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> +
> +        if (devpath_old) {
> +            g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> +
> +            processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
> +        }
> +
> +        processNodeDeviceAddAndChangeEvent(driver_state, device);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
> +    {
> +        if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> +            VIR_WARN("mdevctl failed to update mediated devices");
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_LAST:

> +        g_assert_not_reached();

The assert statement should be replaced with:

virReportEnumRangeError(nodeDeviceEventType, processEvent->eventType);

> +        break;
> +    }
> +}

[…snip]

-- 
Kind regards / Beste Grüße
   Marc Hartmayer

IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Boris Fiuczynski 1 year, 9 months ago
With the too-many-rebase-add-change and the additional comment 
explaining why currently only one worker must be used

Reviewed-by: Boris Fiuczynski <fiuczy@linux.ibm.com>

On 4/19/24 16:49, Marc Hartmayer wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
> 
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---
>   src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
>   1 file changed, 179 insertions(+), 65 deletions(-)
> 
> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
> index e4b1532dc385..67a8b5cd7132 100644
> --- a/src/node_device/node_device_udev.c
> +++ b/src/node_device/node_device_udev.c
> @@ -43,6 +43,7 @@
>   #include "virnetdev.h"
>   #include "virmdev.h"
>   #include "virutil.h"
> +#include "virthreadpool.h"
>   
>   #include "configmake.h"
>   
> @@ -69,13 +70,13 @@ struct _udevEventData {
>       bool udevThreadQuit;
>       bool udevDataReady;
>   
> -    /* init thread */
> -    virThread *initThread;
> -
>       /* Protects @mdevctlMonitors */
>       virMutex mdevctlLock;
>       GList *mdevctlMonitors;
>       int mdevctlTimeout;
> +
> +    /* Immutable pointer, self-locking APIs */
> +    virThreadPool *workerPool;
>   };
>   
>   static virClass *udevEventDataClass;
> @@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
>       struct udev *udev = NULL;
>       udevEventData *priv = obj;
>   
> -    g_clear_pointer(&priv->initThread, g_free);
> -
>       VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
>           g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref);
>       }
> @@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
>           udev_unref(udev);
>       }
>   
> +    g_clear_pointer(&priv->workerPool, virThreadPoolFree);
> +
>       virMutexDestroy(&priv->mdevctlLock);
>   
>       virCondDestroy(&priv->udevThreadCond);
> @@ -143,6 +144,66 @@ udevEventDataNew(void)
>       return ret;
>   }
>   
> +typedef enum {
> +  NODE_DEVICE_EVENT_INIT = 0,
> +  NODE_DEVICE_EVENT_UDEV_ADD,
> +  NODE_DEVICE_EVENT_UDEV_REMOVE,
> +  NODE_DEVICE_EVENT_UDEV_CHANGE,
> +  NODE_DEVICE_EVENT_UDEV_MOVE,
> +  NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
> +
> +  NODE_DEVICE_EVENT_LAST
> +} nodeDeviceEventType;
> +
> +struct _nodeDeviceEvent {
> +    nodeDeviceEventType eventType;
> +    void *data;
> +    virFreeCallback dataFreeFunc;
> +};
> +typedef struct _nodeDeviceEvent nodeDeviceEvent;
> +
> +static void
> +nodeDeviceEventFree(nodeDeviceEvent *event)
> +{
> +    if (!event)
> +        return;
> +
> +    if (event->dataFreeFunc)
> +        event->dataFreeFunc(event->data);
> +    g_free(event);
> +}
> +G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
> +
> + /**
> +  * nodeDeviceEventSubmit:
> +  * @eventType: the event to be processed
> +  * @data: additional data for the event processor (the pointer is stolen and it
> +  *        will be properly freed using @dataFreeFunc)
> +  * @dataFreeFunc: callback to free @data
> +  *
> +  * Submits @eventType to be processed by the asynchronous event handling
> +  * thread.
> +  */
> +static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc)
> +{
> +    nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
> +    udevEventData *priv = NULL;
> +
> +    if (!driver)
> +        return -1;
> +
> +    priv = driver->privateData;
> +
> +    event->eventType = eventType;
> +    event->data = data;
> +    event->dataFreeFunc = dataFreeFunc;
> +    if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
> +        nodeDeviceEventFree(event);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
>   
>   static bool
>   udevHasDeviceProperty(struct udev_device *dev,
> @@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force);
>   
>   
>   static int
> -udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path)
> +processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path)
>   {
>       virNodeDeviceObj *obj = NULL;
>       virNodeDeviceDef *def;
> @@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device
>   }
>   
>   static int
> -udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device)
> +processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device)
>   {
>       g_autofree char *sysfs_path = NULL;
>       virNodeDeviceDef *def = NULL;
> @@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev *
>       device = udev_device_new_from_syspath(udev, name);
>   
>       if (device != NULL) {
> -        if (udevAddOneDevice(driver_state, device) != 0) {
> +        if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
>               VIR_DEBUG("Failed to create node device for udev device '%s'",
>                         name);
>           }
> @@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device)
>   
>       VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device));
>   
> -    if (STREQ(action, "add") || STREQ(action, "change"))
> -        return udevAddOneDevice(driver, device);
> -
> -    if (STREQ(action, "remove")) {
> -        const char *path = udev_device_get_syspath(device);
> -
> -        return udevRemoveOneDeviceSysPath(driver, path);
> -    }
> -
> -    if (STREQ(action, "move")) {
> -        const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> -
> -        if (devpath_old) {
> -            g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> -
> -            udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
> -        }
> -
> -        return udevAddOneDevice(driver, device);
> +    /* Reference is either released via workerpool logic or at the end of this
> +     * function. */
> +    device = udev_device_ref(device);
> +    if (STREQ(action, "add")) {
> +        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
> +                                     (virFreeCallback)udev_device_unref);
> +    } else if (STREQ(action, "change")) {
> +        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
> +                                     (virFreeCallback)udev_device_unref);
> +    } else if (STREQ(action, "remove")) {
> +        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
> +                                     (virFreeCallback)udev_device_unref);
> +    } else if (STREQ(action, "move")) {
> +        return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
> +                                     (virFreeCallback)udev_device_unref);
>       }
> +    udev_device_unref(device);
>   
>       return 0;
>   }
> @@ -1993,23 +2051,23 @@ udevSetupSystemDev(void)
>   
>   
>   static void
> -nodeStateInitializeEnumerate(void *opaque)
> +processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void *opaque)
>   {
> -    udevEventData *priv = driver->privateData;
> +    udevEventData *priv = driver_state->privateData;
>       struct udev *udev = opaque;
>   
>       /* Populate with known devices */
> -    if (udevEnumerateDevices(driver, udev) != 0)
> +    if (udevEnumerateDevices(driver_state, udev) != 0)
>           goto error;
>       /* Load persistent mdevs (which might not be activated yet) and additional
>        * information about active mediated devices from mdevctl */
> -    if (nodeDeviceUpdateMediatedDevices(driver) != 0)
> +    if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
>           goto error;
>   
>    cleanup:
> -    VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) {
> -        driver->initialized = true;
> -        virCondBroadcast(&driver->initCond);
> +    VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
> +        driver_state->initialized = true;
> +        virCondBroadcast(&driver_state->initCond);
>       }
>   
>       return;
> @@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
>   
>   
>   static void
> -mdevctlUpdateThreadFunc(void *opaque)
> -{
> -    virNodeDeviceDriverState *driver_state = opaque;
> -
> -    if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> -        VIR_WARN("mdevctl failed to update mediated devices");
> -}
> -
> -
> -static void
> -launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
> +submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
>   {
>       udevEventData *priv = opaque;
> -    virThread thread;
>   
>       if (priv->mdevctlTimeout != -1) {
>           virEventRemoveTimeout(priv->mdevctlTimeout);
>           priv->mdevctlTimeout = -1;
>       }
>   
> -    if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc,
> -                            "mdevctl-thread", false, driver) < 0) {
> -        virReportSystemError(errno, "%s",
> -                             _("failed to create mdevctl thread"));
> -    }
> +    nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,  NULL, NULL);
>   }
>   
>   
> @@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv)
>   /* Schedules an mdevctl update for 100ms in the future, canceling any existing
>    * timeout that may have been set. In this way, multiple update requests in
>    * quick succession can be collapsed into a single update. if @force is true,
> - * an update thread will be spawned immediately. */
> + * the worker job is submitted immediately. */
>   static void
>   scheduleMdevctlUpdate(udevEventData *data,
>                         bool force)
> @@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data,
>       if (!force) {
>           if (data->mdevctlTimeout != -1)
>               virEventRemoveTimeout(data->mdevctlTimeout);
> -        data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread,
> +        data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
>                                                     data, NULL);
>           return;
>       }
>   
> -    launchMdevctlUpdateThread(-1, data);
> +    submitMdevctlUpdate(-1, data);
>   }
>   
>   
> @@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
>   }
>   
>   
> +static void nodeDeviceEventHandler(void *data, void *opaque)
> +{
> +    virNodeDeviceDriverState *driver_state = opaque;
> +    g_autoptr(nodeDeviceEvent) processEvent = data;
> +
> +    switch (processEvent->eventType) {
> +    case NODE_DEVICE_EVENT_INIT:
> +    {
> +        struct udev *udev = processEvent->data;
> +
> +        processNodeStateInitializeEnumerate(driver_state, udev);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_ADD:
> +    case NODE_DEVICE_EVENT_UDEV_CHANGE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +
> +        processNodeDeviceAddAndChangeEvent(driver_state, device);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_REMOVE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +        const char *path = udev_device_get_syspath(device);
> +
> +        processNodeDeviceRemoveEvent(driver_state, path);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_UDEV_MOVE:
> +    {
> +        struct udev_device *device = processEvent->data;
> +        const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> +
> +        if (devpath_old) {
> +            g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> +
> +            processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
> +        }
> +
> +        processNodeDeviceAddAndChangeEvent(driver_state, device);
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
> +    {
> +        if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> +            VIR_WARN("mdevctl failed to update mediated devices");
> +    }
> +    break;
> +    case NODE_DEVICE_EVENT_LAST:
> +        g_assert_not_reached();
> +        break;
> +    }
> +}
> +
> +
>   /* Note: It must be safe to call this function even if the driver was not
>    *       successfully initialized. This must be considered when changing this
>    *       function. */
> @@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void)
>           priv->udevThreadQuit = true;
>           virCondSignal(&priv->udevThreadCond);
>       }
> +
> +    if (priv->workerPool)
> +        virThreadPoolStop(priv->workerPool);
>       return 0;
>   }
>   
> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
>           return 0;
>   
>       VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> -        if (priv->initThread)
> -            virThreadJoin(priv->initThread);
> -        if (priv->udevThread)
> -            virThreadJoin(priv->udevThread);
> +        if (priv->mdevctlTimeout != -1) {
> +            virEventRemoveTimeout(priv->mdevctlTimeout);
> +            priv->mdevctlTimeout = -1;
> +        }
> +
> +        if (priv->watch) {
> +            virEventRemoveHandle(priv->watch);
> +            priv->watch = -1;
> +        }
>       }
> +
> +    if (priv->workerPool)
> +        virThreadPoolDrain(priv->workerPool);
>       return 0;
>   }
>   
> @@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged,
>       driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
>       driver->parserCallbacks.validate = nodeDeviceDefValidate;
>   
> +    /* must be initialized before trying to reconnect to all the running mdevs
> +     * since there might occur some mdevctl monitor events that will be
> +     * dispatched to the worker pool */


Add the important information that the current implementation supports 
the use of one worker only to ensure the order of udev and libvirt 
events remains the same.


> +    priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler,
> +                                            "nodev-device-event",
> +                                            NULL,
> +                                            driver);
> +    if (!priv->workerPool)
> +        goto unlock;
> +
>       if (udevPCITranslateInit(privileged) < 0)
>           goto unlock;
>   
> @@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged,
>       if (udevSetupSystemDev() != 0)
>           goto cleanup;
>   
> -    priv->initThread = g_new0(virThread, 1);
> -    if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
> -                            "nodedev-init", false, udev) < 0) {
> -        virReportSystemError(errno, "%s",
> -                             _("failed to create udev enumerate thread"));
> -        goto cleanup;
> -    }
> +    nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref);
>   
>       return VIR_DRV_STATE_INIT_COMPLETE;
>   

-- 
Mit freundlichen Grüßen/Kind regards
    Boris Fiuczynski

IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Marc Hartmayer 1 year, 9 months ago
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---

[…snip…]

>  
> +    /* must be initialized before trying to reconnect to all the running mdevs
> +     * since there might occur some mdevctl monitor events that will be
> +     * dispatched to the worker pool */
> +    priv->workerPool = virThreadPoolNewFull(1, 1, 0,
> nodeDeviceEventHandler,

The more I think about the number of workers in this pool, the more I'm
convinced that it's (currently) important to use only _one_ worker (1
udev thread <-> 1 worker), because otherwise we don't have any
guarantees that we comply to the following:

order(udev_events) == order(libvirt nodedev events)

And I guess we would like to fulfill this guarantee.

If you agree, then we should add a comment to the code and if needed we
can implement something for the case #count > 1.

[…snip]

-- 
Kind regards / Beste Grüße
   Marc Hartmayer

IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Daniel P. Berrangé 1 year, 9 months ago
On Mon, Apr 22, 2024 at 02:45:52PM +0200, Marc Hartmayer wrote:
> On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> > Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> > and the initialization instead of a separate initThread and a mdevctl-thread.
> > This has the large advantage that we can leverage the job API and now this
> > thread pool is responsible to do all the "costly-work" and emitting the libvirt
> > nodedev events.
> >
> > Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> > ---
> 
> […snip…]
> 
> >  
> > +    /* must be initialized before trying to reconnect to all the running mdevs
> > +     * since there might occur some mdevctl monitor events that will be
> > +     * dispatched to the worker pool */
> > +    priv->workerPool = virThreadPoolNewFull(1, 1, 0,
> > nodeDeviceEventHandler,
> 
> The more I think about the number of workers in this pool, the more I'm
> convinced that it's (currently) important to use only _one_ worker (1
> udev thread <-> 1 worker), because otherwise we don't have any
> guarantees that we comply to the following:
> 
> order(udev_events) == order(libvirt nodedev events)
> 
> And I guess we would like to fulfill this guarantee.

Yes, we need to preserve that ordering, otherwise the events become
largely unusable.

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Marc Hartmayer 1 year, 9 months ago
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---
>  src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
>  1 file changed, 179 insertions(+), 65 deletions(-)
>
> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
> index e4b1532dc385..67a8b5cd7132 100644

[…snip…]

>  }
>  
> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
>          return 0;
>  
>      VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> -        if (priv->initThread)
> -            virThreadJoin(priv->initThread);
> -        if (priv->udevThread)
> -            virThreadJoin(priv->udevThread);

> +        if (priv->mdevctlTimeout != -1) {
> +            virEventRemoveTimeout(priv->mdevctlTimeout);
> +            priv->mdevctlTimeout = -1;
> +        }
> +
> +        if (priv->watch) {
> +            virEventRemoveHandle(priv->watch);
> +            priv->watch = -1;
> +        }

Too many rebases… the diff should read as follows:

@@ -2278,11 +2380,12 @@ nodeStateShutdownWait(void)
         return 0;

     VIR_WITH_OBJECT_LOCK_GUARD(priv) {
-        if (priv->initThread)
-            virThreadJoin(priv->initThread);
         if (priv->udevThread)
             virThreadJoin(priv->udevThread);
     }
+
+    if (priv->workerPool)
+        virThreadPoolDrain(priv->workerPool);
     return 0;
 }

[…snip]

-- 
Kind regards / Beste Grüße
   Marc Hartmayer

IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
Re: [PATCH v1 16/20] node_device_udev: Use a worker pool for processing events and emitting nodedev event
Posted by Jonathon Jongsma 1 year, 9 months ago
On 4/19/24 11:04 AM, Marc Hartmayer wrote:
> On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
>> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
>> and the initialization instead of a separate initThread and a mdevctl-thread.
>> This has the large advantage that we can leverage the job API and now this
>> thread pool is responsible to do all the "costly-work" and emitting the libvirt
>> nodedev events.
>>
>> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
>> ---
>>   src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
>>   1 file changed, 179 insertions(+), 65 deletions(-)
>>
>> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
>> index e4b1532dc385..67a8b5cd7132 100644
> 
> […snip…]
> 
>>   }
>>   
>> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
>>           return 0;
>>   
>>       VIR_WITH_OBJECT_LOCK_GUARD(priv) {
>> -        if (priv->initThread)
>> -            virThreadJoin(priv->initThread);
>> -        if (priv->udevThread)
>> -            virThreadJoin(priv->udevThread);
> 
>> +        if (priv->mdevctlTimeout != -1) {
>> +            virEventRemoveTimeout(priv->mdevctlTimeout);
>> +            priv->mdevctlTimeout = -1;
>> +        }
>> +
>> +        if (priv->watch) {
>> +            virEventRemoveHandle(priv->watch);
>> +            priv->watch = -1;
>> +        }
> 
> Too many rebases… the diff should read as follows:
> 
> @@ -2278,11 +2380,12 @@ nodeStateShutdownWait(void)
>           return 0;
> 
>       VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> -        if (priv->initThread)
> -            virThreadJoin(priv->initThread);
>           if (priv->udevThread)
>               virThreadJoin(priv->udevThread);
>       }
> +
> +    if (priv->workerPool)
> +        virThreadPoolDrain(priv->workerPool);
>       return 0;
>   }
> 
> […snip]
> 



Reviewed-by: Jonathon Jongsma <jjongsma@redhat.com>
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org