Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
and the initialization instead of a separate initThread and a mdevctl-thread.
This has the large advantage that we can leverage the job API and now this
thread pool is responsible to do all the "costly-work" and emitting the libvirt
nodedev events.
Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
---
src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
1 file changed, 179 insertions(+), 65 deletions(-)
diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
index e4b1532dc385..67a8b5cd7132 100644
--- a/src/node_device/node_device_udev.c
+++ b/src/node_device/node_device_udev.c
@@ -43,6 +43,7 @@
#include "virnetdev.h"
#include "virmdev.h"
#include "virutil.h"
+#include "virthreadpool.h"
#include "configmake.h"
@@ -69,13 +70,13 @@ struct _udevEventData {
bool udevThreadQuit;
bool udevDataReady;
- /* init thread */
- virThread *initThread;
-
/* Protects @mdevctlMonitors */
virMutex mdevctlLock;
GList *mdevctlMonitors;
int mdevctlTimeout;
+
+ /* Immutable pointer, self-locking APIs */
+ virThreadPool *workerPool;
};
static virClass *udevEventDataClass;
@@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
struct udev *udev = NULL;
udevEventData *priv = obj;
- g_clear_pointer(&priv->initThread, g_free);
-
VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref);
}
@@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
udev_unref(udev);
}
+ g_clear_pointer(&priv->workerPool, virThreadPoolFree);
+
virMutexDestroy(&priv->mdevctlLock);
virCondDestroy(&priv->udevThreadCond);
@@ -143,6 +144,66 @@ udevEventDataNew(void)
return ret;
}
+typedef enum {
+ NODE_DEVICE_EVENT_INIT = 0,
+ NODE_DEVICE_EVENT_UDEV_ADD,
+ NODE_DEVICE_EVENT_UDEV_REMOVE,
+ NODE_DEVICE_EVENT_UDEV_CHANGE,
+ NODE_DEVICE_EVENT_UDEV_MOVE,
+ NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
+
+ NODE_DEVICE_EVENT_LAST
+} nodeDeviceEventType;
+
+struct _nodeDeviceEvent {
+ nodeDeviceEventType eventType;
+ void *data;
+ virFreeCallback dataFreeFunc;
+};
+typedef struct _nodeDeviceEvent nodeDeviceEvent;
+
+static void
+nodeDeviceEventFree(nodeDeviceEvent *event)
+{
+ if (!event)
+ return;
+
+ if (event->dataFreeFunc)
+ event->dataFreeFunc(event->data);
+ g_free(event);
+}
+G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
+
+ /**
+ * nodeDeviceEventSubmit:
+ * @eventType: the event to be processed
+ * @data: additional data for the event processor (the pointer is stolen and it
+ * will be properly freed using @dataFreeFunc)
+ * @dataFreeFunc: callback to free @data
+ *
+ * Submits @eventType to be processed by the asynchronous event handling
+ * thread.
+ */
+static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc)
+{
+ nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
+ udevEventData *priv = NULL;
+
+ if (!driver)
+ return -1;
+
+ priv = driver->privateData;
+
+ event->eventType = eventType;
+ event->data = data;
+ event->dataFreeFunc = dataFreeFunc;
+ if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
+ nodeDeviceEventFree(event);
+ return -1;
+ }
+ return 0;
+}
+
static bool
udevHasDeviceProperty(struct udev_device *dev,
@@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force);
static int
-udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path)
+processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path)
{
virNodeDeviceObj *obj = NULL;
virNodeDeviceDef *def;
@@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device
}
static int
-udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device)
+processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device)
{
g_autofree char *sysfs_path = NULL;
virNodeDeviceDef *def = NULL;
@@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev *
device = udev_device_new_from_syspath(udev, name);
if (device != NULL) {
- if (udevAddOneDevice(driver_state, device) != 0) {
+ if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
VIR_DEBUG("Failed to create node device for udev device '%s'",
name);
}
@@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device)
VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device));
- if (STREQ(action, "add") || STREQ(action, "change"))
- return udevAddOneDevice(driver, device);
-
- if (STREQ(action, "remove")) {
- const char *path = udev_device_get_syspath(device);
-
- return udevRemoveOneDeviceSysPath(driver, path);
- }
-
- if (STREQ(action, "move")) {
- const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
-
- if (devpath_old) {
- g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
-
- udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
- }
-
- return udevAddOneDevice(driver, device);
+ /* Reference is either released via workerpool logic or at the end of this
+ * function. */
+ device = udev_device_ref(device);
+ if (STREQ(action, "add")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "change")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "remove")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "move")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
+ (virFreeCallback)udev_device_unref);
}
+ udev_device_unref(device);
return 0;
}
@@ -1993,23 +2051,23 @@ udevSetupSystemDev(void)
static void
-nodeStateInitializeEnumerate(void *opaque)
+processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void *opaque)
{
- udevEventData *priv = driver->privateData;
+ udevEventData *priv = driver_state->privateData;
struct udev *udev = opaque;
/* Populate with known devices */
- if (udevEnumerateDevices(driver, udev) != 0)
+ if (udevEnumerateDevices(driver_state, udev) != 0)
goto error;
/* Load persistent mdevs (which might not be activated yet) and additional
* information about active mediated devices from mdevctl */
- if (nodeDeviceUpdateMediatedDevices(driver) != 0)
+ if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
goto error;
cleanup:
- VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) {
- driver->initialized = true;
- virCondBroadcast(&driver->initCond);
+ VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
+ driver_state->initialized = true;
+ virCondBroadcast(&driver_state->initCond);
}
return;
@@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
static void
-mdevctlUpdateThreadFunc(void *opaque)
-{
- virNodeDeviceDriverState *driver_state = opaque;
-
- if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
- VIR_WARN("mdevctl failed to update mediated devices");
-}
-
-
-static void
-launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
+submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
{
udevEventData *priv = opaque;
- virThread thread;
if (priv->mdevctlTimeout != -1) {
virEventRemoveTimeout(priv->mdevctlTimeout);
priv->mdevctlTimeout = -1;
}
- if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc,
- "mdevctl-thread", false, driver) < 0) {
- virReportSystemError(errno, "%s",
- _("failed to create mdevctl thread"));
- }
+ nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL);
}
@@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv)
/* Schedules an mdevctl update for 100ms in the future, canceling any existing
* timeout that may have been set. In this way, multiple update requests in
* quick succession can be collapsed into a single update. if @force is true,
- * an update thread will be spawned immediately. */
+ * the worker job is submitted immediately. */
static void
scheduleMdevctlUpdate(udevEventData *data,
bool force)
@@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data,
if (!force) {
if (data->mdevctlTimeout != -1)
virEventRemoveTimeout(data->mdevctlTimeout);
- data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread,
+ data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
data, NULL);
return;
}
- launchMdevctlUpdateThread(-1, data);
+ submitMdevctlUpdate(-1, data);
}
@@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
}
+static void nodeDeviceEventHandler(void *data, void *opaque)
+{
+ virNodeDeviceDriverState *driver_state = opaque;
+ g_autoptr(nodeDeviceEvent) processEvent = data;
+
+ switch (processEvent->eventType) {
+ case NODE_DEVICE_EVENT_INIT:
+ {
+ struct udev *udev = processEvent->data;
+
+ processNodeStateInitializeEnumerate(driver_state, udev);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_ADD:
+ case NODE_DEVICE_EVENT_UDEV_CHANGE:
+ {
+ struct udev_device *device = processEvent->data;
+
+ processNodeDeviceAddAndChangeEvent(driver_state, device);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_REMOVE:
+ {
+ struct udev_device *device = processEvent->data;
+ const char *path = udev_device_get_syspath(device);
+
+ processNodeDeviceRemoveEvent(driver_state, path);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_MOVE:
+ {
+ struct udev_device *device = processEvent->data;
+ const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
+
+ if (devpath_old) {
+ g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
+
+ processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
+ }
+
+ processNodeDeviceAddAndChangeEvent(driver_state, device);
+ }
+ break;
+ case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
+ {
+ if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
+ VIR_WARN("mdevctl failed to update mediated devices");
+ }
+ break;
+ case NODE_DEVICE_EVENT_LAST:
+ g_assert_not_reached();
+ break;
+ }
+}
+
+
/* Note: It must be safe to call this function even if the driver was not
* successfully initialized. This must be considered when changing this
* function. */
@@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void)
priv->udevThreadQuit = true;
virCondSignal(&priv->udevThreadCond);
}
+
+ if (priv->workerPool)
+ virThreadPoolStop(priv->workerPool);
return 0;
}
@@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
return 0;
VIR_WITH_OBJECT_LOCK_GUARD(priv) {
- if (priv->initThread)
- virThreadJoin(priv->initThread);
- if (priv->udevThread)
- virThreadJoin(priv->udevThread);
+ if (priv->mdevctlTimeout != -1) {
+ virEventRemoveTimeout(priv->mdevctlTimeout);
+ priv->mdevctlTimeout = -1;
+ }
+
+ if (priv->watch) {
+ virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
+ }
}
+
+ if (priv->workerPool)
+ virThreadPoolDrain(priv->workerPool);
return 0;
}
@@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged,
driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
driver->parserCallbacks.validate = nodeDeviceDefValidate;
+ /* must be initialized before trying to reconnect to all the running mdevs
+ * since there might occur some mdevctl monitor events that will be
+ * dispatched to the worker pool */
+ priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler,
+ "nodev-device-event",
+ NULL,
+ driver);
+ if (!priv->workerPool)
+ goto unlock;
+
if (udevPCITranslateInit(privileged) < 0)
goto unlock;
@@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged,
if (udevSetupSystemDev() != 0)
goto cleanup;
- priv->initThread = g_new0(virThread, 1);
- if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
- "nodedev-init", false, udev) < 0) {
- virReportSystemError(errno, "%s",
- _("failed to create udev enumerate thread"));
- goto cleanup;
- }
+ nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref);
return VIR_DRV_STATE_INIT_COMPLETE;
--
2.34.1
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---
[…snip…]
>
>
> +static void nodeDeviceEventHandler(void *data, void *opaque)
> +{
> + virNodeDeviceDriverState *driver_state = opaque;
> + g_autoptr(nodeDeviceEvent) processEvent = data;
> +
> + switch (processEvent->eventType) {
> + case NODE_DEVICE_EVENT_INIT:
> + {
> + struct udev *udev = processEvent->data;
> +
> + processNodeStateInitializeEnumerate(driver_state, udev);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_ADD:
> + case NODE_DEVICE_EVENT_UDEV_CHANGE:
> + {
> + struct udev_device *device = processEvent->data;
> +
> + processNodeDeviceAddAndChangeEvent(driver_state, device);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_REMOVE:
> + {
> + struct udev_device *device = processEvent->data;
> + const char *path = udev_device_get_syspath(device);
> +
> + processNodeDeviceRemoveEvent(driver_state, path);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_MOVE:
> + {
> + struct udev_device *device = processEvent->data;
> + const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> +
> + if (devpath_old) {
> + g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> +
> + processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
> + }
> +
> + processNodeDeviceAddAndChangeEvent(driver_state, device);
> + }
> + break;
> + case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
> + {
> + if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> + VIR_WARN("mdevctl failed to update mediated devices");
> + }
> + break;
> + case NODE_DEVICE_EVENT_LAST:
> + g_assert_not_reached();
The assert statement should be replaced with:
virReportEnumRangeError(nodeDeviceEventType, processEvent->eventType);
> + break;
> + }
> +}
[…snip]
--
Kind regards / Beste Grüße
Marc Hartmayer
IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
With the too-many-rebase-add-change and the additional comment
explaining why currently only one worker must be used
Reviewed-by: Boris Fiuczynski <fiuczy@linux.ibm.com>
On 4/19/24 16:49, Marc Hartmayer wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---
> src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
> 1 file changed, 179 insertions(+), 65 deletions(-)
>
> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
> index e4b1532dc385..67a8b5cd7132 100644
> --- a/src/node_device/node_device_udev.c
> +++ b/src/node_device/node_device_udev.c
> @@ -43,6 +43,7 @@
> #include "virnetdev.h"
> #include "virmdev.h"
> #include "virutil.h"
> +#include "virthreadpool.h"
>
> #include "configmake.h"
>
> @@ -69,13 +70,13 @@ struct _udevEventData {
> bool udevThreadQuit;
> bool udevDataReady;
>
> - /* init thread */
> - virThread *initThread;
> -
> /* Protects @mdevctlMonitors */
> virMutex mdevctlLock;
> GList *mdevctlMonitors;
> int mdevctlTimeout;
> +
> + /* Immutable pointer, self-locking APIs */
> + virThreadPool *workerPool;
> };
>
> static virClass *udevEventDataClass;
> @@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
> struct udev *udev = NULL;
> udevEventData *priv = obj;
>
> - g_clear_pointer(&priv->initThread, g_free);
> -
> VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
> g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref);
> }
> @@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
> udev_unref(udev);
> }
>
> + g_clear_pointer(&priv->workerPool, virThreadPoolFree);
> +
> virMutexDestroy(&priv->mdevctlLock);
>
> virCondDestroy(&priv->udevThreadCond);
> @@ -143,6 +144,66 @@ udevEventDataNew(void)
> return ret;
> }
>
> +typedef enum {
> + NODE_DEVICE_EVENT_INIT = 0,
> + NODE_DEVICE_EVENT_UDEV_ADD,
> + NODE_DEVICE_EVENT_UDEV_REMOVE,
> + NODE_DEVICE_EVENT_UDEV_CHANGE,
> + NODE_DEVICE_EVENT_UDEV_MOVE,
> + NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
> +
> + NODE_DEVICE_EVENT_LAST
> +} nodeDeviceEventType;
> +
> +struct _nodeDeviceEvent {
> + nodeDeviceEventType eventType;
> + void *data;
> + virFreeCallback dataFreeFunc;
> +};
> +typedef struct _nodeDeviceEvent nodeDeviceEvent;
> +
> +static void
> +nodeDeviceEventFree(nodeDeviceEvent *event)
> +{
> + if (!event)
> + return;
> +
> + if (event->dataFreeFunc)
> + event->dataFreeFunc(event->data);
> + g_free(event);
> +}
> +G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
> +
> + /**
> + * nodeDeviceEventSubmit:
> + * @eventType: the event to be processed
> + * @data: additional data for the event processor (the pointer is stolen and it
> + * will be properly freed using @dataFreeFunc)
> + * @dataFreeFunc: callback to free @data
> + *
> + * Submits @eventType to be processed by the asynchronous event handling
> + * thread.
> + */
> +static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc)
> +{
> + nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
> + udevEventData *priv = NULL;
> +
> + if (!driver)
> + return -1;
> +
> + priv = driver->privateData;
> +
> + event->eventType = eventType;
> + event->data = data;
> + event->dataFreeFunc = dataFreeFunc;
> + if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
> + nodeDeviceEventFree(event);
> + return -1;
> + }
> + return 0;
> +}
> +
>
> static bool
> udevHasDeviceProperty(struct udev_device *dev,
> @@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force);
>
>
> static int
> -udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path)
> +processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path)
> {
> virNodeDeviceObj *obj = NULL;
> virNodeDeviceDef *def;
> @@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device
> }
>
> static int
> -udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device)
> +processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device)
> {
> g_autofree char *sysfs_path = NULL;
> virNodeDeviceDef *def = NULL;
> @@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev *
> device = udev_device_new_from_syspath(udev, name);
>
> if (device != NULL) {
> - if (udevAddOneDevice(driver_state, device) != 0) {
> + if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
> VIR_DEBUG("Failed to create node device for udev device '%s'",
> name);
> }
> @@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device)
>
> VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device));
>
> - if (STREQ(action, "add") || STREQ(action, "change"))
> - return udevAddOneDevice(driver, device);
> -
> - if (STREQ(action, "remove")) {
> - const char *path = udev_device_get_syspath(device);
> -
> - return udevRemoveOneDeviceSysPath(driver, path);
> - }
> -
> - if (STREQ(action, "move")) {
> - const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> -
> - if (devpath_old) {
> - g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> -
> - udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
> - }
> -
> - return udevAddOneDevice(driver, device);
> + /* Reference is either released via workerpool logic or at the end of this
> + * function. */
> + device = udev_device_ref(device);
> + if (STREQ(action, "add")) {
> + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
> + (virFreeCallback)udev_device_unref);
> + } else if (STREQ(action, "change")) {
> + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
> + (virFreeCallback)udev_device_unref);
> + } else if (STREQ(action, "remove")) {
> + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
> + (virFreeCallback)udev_device_unref);
> + } else if (STREQ(action, "move")) {
> + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
> + (virFreeCallback)udev_device_unref);
> }
> + udev_device_unref(device);
>
> return 0;
> }
> @@ -1993,23 +2051,23 @@ udevSetupSystemDev(void)
>
>
> static void
> -nodeStateInitializeEnumerate(void *opaque)
> +processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void *opaque)
> {
> - udevEventData *priv = driver->privateData;
> + udevEventData *priv = driver_state->privateData;
> struct udev *udev = opaque;
>
> /* Populate with known devices */
> - if (udevEnumerateDevices(driver, udev) != 0)
> + if (udevEnumerateDevices(driver_state, udev) != 0)
> goto error;
> /* Load persistent mdevs (which might not be activated yet) and additional
> * information about active mediated devices from mdevctl */
> - if (nodeDeviceUpdateMediatedDevices(driver) != 0)
> + if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
> goto error;
>
> cleanup:
> - VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) {
> - driver->initialized = true;
> - virCondBroadcast(&driver->initCond);
> + VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
> + driver_state->initialized = true;
> + virCondBroadcast(&driver_state->initCond);
> }
>
> return;
> @@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
>
>
> static void
> -mdevctlUpdateThreadFunc(void *opaque)
> -{
> - virNodeDeviceDriverState *driver_state = opaque;
> -
> - if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> - VIR_WARN("mdevctl failed to update mediated devices");
> -}
> -
> -
> -static void
> -launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
> +submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
> {
> udevEventData *priv = opaque;
> - virThread thread;
>
> if (priv->mdevctlTimeout != -1) {
> virEventRemoveTimeout(priv->mdevctlTimeout);
> priv->mdevctlTimeout = -1;
> }
>
> - if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc,
> - "mdevctl-thread", false, driver) < 0) {
> - virReportSystemError(errno, "%s",
> - _("failed to create mdevctl thread"));
> - }
> + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL);
> }
>
>
> @@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv)
> /* Schedules an mdevctl update for 100ms in the future, canceling any existing
> * timeout that may have been set. In this way, multiple update requests in
> * quick succession can be collapsed into a single update. if @force is true,
> - * an update thread will be spawned immediately. */
> + * the worker job is submitted immediately. */
> static void
> scheduleMdevctlUpdate(udevEventData *data,
> bool force)
> @@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data,
> if (!force) {
> if (data->mdevctlTimeout != -1)
> virEventRemoveTimeout(data->mdevctlTimeout);
> - data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread,
> + data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
> data, NULL);
> return;
> }
>
> - launchMdevctlUpdateThread(-1, data);
> + submitMdevctlUpdate(-1, data);
> }
>
>
> @@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
> }
>
>
> +static void nodeDeviceEventHandler(void *data, void *opaque)
> +{
> + virNodeDeviceDriverState *driver_state = opaque;
> + g_autoptr(nodeDeviceEvent) processEvent = data;
> +
> + switch (processEvent->eventType) {
> + case NODE_DEVICE_EVENT_INIT:
> + {
> + struct udev *udev = processEvent->data;
> +
> + processNodeStateInitializeEnumerate(driver_state, udev);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_ADD:
> + case NODE_DEVICE_EVENT_UDEV_CHANGE:
> + {
> + struct udev_device *device = processEvent->data;
> +
> + processNodeDeviceAddAndChangeEvent(driver_state, device);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_REMOVE:
> + {
> + struct udev_device *device = processEvent->data;
> + const char *path = udev_device_get_syspath(device);
> +
> + processNodeDeviceRemoveEvent(driver_state, path);
> + }
> + break;
> + case NODE_DEVICE_EVENT_UDEV_MOVE:
> + {
> + struct udev_device *device = processEvent->data;
> + const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
> +
> + if (devpath_old) {
> + g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
> +
> + processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
> + }
> +
> + processNodeDeviceAddAndChangeEvent(driver_state, device);
> + }
> + break;
> + case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
> + {
> + if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
> + VIR_WARN("mdevctl failed to update mediated devices");
> + }
> + break;
> + case NODE_DEVICE_EVENT_LAST:
> + g_assert_not_reached();
> + break;
> + }
> +}
> +
> +
> /* Note: It must be safe to call this function even if the driver was not
> * successfully initialized. This must be considered when changing this
> * function. */
> @@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void)
> priv->udevThreadQuit = true;
> virCondSignal(&priv->udevThreadCond);
> }
> +
> + if (priv->workerPool)
> + virThreadPoolStop(priv->workerPool);
> return 0;
> }
>
> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
> return 0;
>
> VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> - if (priv->initThread)
> - virThreadJoin(priv->initThread);
> - if (priv->udevThread)
> - virThreadJoin(priv->udevThread);
> + if (priv->mdevctlTimeout != -1) {
> + virEventRemoveTimeout(priv->mdevctlTimeout);
> + priv->mdevctlTimeout = -1;
> + }
> +
> + if (priv->watch) {
> + virEventRemoveHandle(priv->watch);
> + priv->watch = -1;
> + }
> }
> +
> + if (priv->workerPool)
> + virThreadPoolDrain(priv->workerPool);
> return 0;
> }
>
> @@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged,
> driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
> driver->parserCallbacks.validate = nodeDeviceDefValidate;
>
> + /* must be initialized before trying to reconnect to all the running mdevs
> + * since there might occur some mdevctl monitor events that will be
> + * dispatched to the worker pool */
Add the important information that the current implementation supports
the use of one worker only to ensure the order of udev and libvirt
events remains the same.
> + priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler,
> + "nodev-device-event",
> + NULL,
> + driver);
> + if (!priv->workerPool)
> + goto unlock;
> +
> if (udevPCITranslateInit(privileged) < 0)
> goto unlock;
>
> @@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged,
> if (udevSetupSystemDev() != 0)
> goto cleanup;
>
> - priv->initThread = g_new0(virThread, 1);
> - if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
> - "nodedev-init", false, udev) < 0) {
> - virReportSystemError(errno, "%s",
> - _("failed to create udev enumerate thread"));
> - goto cleanup;
> - }
> + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref);
>
> return VIR_DRV_STATE_INIT_COMPLETE;
>
--
Mit freundlichen Grüßen/Kind regards
Boris Fiuczynski
IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote: > Use a worker pool for processing the events (e.g. udev, mdevctl config changes) > and the initialization instead of a separate initThread and a mdevctl-thread. > This has the large advantage that we can leverage the job API and now this > thread pool is responsible to do all the "costly-work" and emitting the libvirt > nodedev events. > > Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com> > --- […snip…] > > + /* must be initialized before trying to reconnect to all the running mdevs > + * since there might occur some mdevctl monitor events that will be > + * dispatched to the worker pool */ > + priv->workerPool = virThreadPoolNewFull(1, 1, 0, > nodeDeviceEventHandler, The more I think about the number of workers in this pool, the more I'm convinced that it's (currently) important to use only _one_ worker (1 udev thread <-> 1 worker), because otherwise we don't have any guarantees that we comply to the following: order(udev_events) == order(libvirt nodedev events) And I guess we would like to fulfill this guarantee. If you agree, then we should add a comment to the code and if needed we can implement something for the case #count > 1. […snip] -- Kind regards / Beste Grüße Marc Hartmayer IBM Deutschland Research & Development GmbH Vorsitzender des Aufsichtsrats: Wolfgang Wendt Geschäftsführung: David Faller Sitz der Gesellschaft: Böblingen Registergericht: Amtsgericht Stuttgart, HRB 243294 _______________________________________________ Devel mailing list -- devel@lists.libvirt.org To unsubscribe send an email to devel-leave@lists.libvirt.org
On Mon, Apr 22, 2024 at 02:45:52PM +0200, Marc Hartmayer wrote: > On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote: > > Use a worker pool for processing the events (e.g. udev, mdevctl config changes) > > and the initialization instead of a separate initThread and a mdevctl-thread. > > This has the large advantage that we can leverage the job API and now this > > thread pool is responsible to do all the "costly-work" and emitting the libvirt > > nodedev events. > > > > Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com> > > --- > > […snip…] > > > > > + /* must be initialized before trying to reconnect to all the running mdevs > > + * since there might occur some mdevctl monitor events that will be > > + * dispatched to the worker pool */ > > + priv->workerPool = virThreadPoolNewFull(1, 1, 0, > > nodeDeviceEventHandler, > > The more I think about the number of workers in this pool, the more I'm > convinced that it's (currently) important to use only _one_ worker (1 > udev thread <-> 1 worker), because otherwise we don't have any > guarantees that we comply to the following: > > order(udev_events) == order(libvirt nodedev events) > > And I guess we would like to fulfill this guarantee. Yes, we need to preserve that ordering, otherwise the events become largely unusable. With regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :| _______________________________________________ Devel mailing list -- devel@lists.libvirt.org To unsubscribe send an email to devel-leave@lists.libvirt.org
On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
> and the initialization instead of a separate initThread and a mdevctl-thread.
> This has the large advantage that we can leverage the job API and now this
> thread pool is responsible to do all the "costly-work" and emitting the libvirt
> nodedev events.
>
> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
> ---
> src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
> 1 file changed, 179 insertions(+), 65 deletions(-)
>
> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
> index e4b1532dc385..67a8b5cd7132 100644
[…snip…]
> }
>
> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
> return 0;
>
> VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> - if (priv->initThread)
> - virThreadJoin(priv->initThread);
> - if (priv->udevThread)
> - virThreadJoin(priv->udevThread);
> + if (priv->mdevctlTimeout != -1) {
> + virEventRemoveTimeout(priv->mdevctlTimeout);
> + priv->mdevctlTimeout = -1;
> + }
> +
> + if (priv->watch) {
> + virEventRemoveHandle(priv->watch);
> + priv->watch = -1;
> + }
Too many rebases… the diff should read as follows:
@@ -2278,11 +2380,12 @@ nodeStateShutdownWait(void)
return 0;
VIR_WITH_OBJECT_LOCK_GUARD(priv) {
- if (priv->initThread)
- virThreadJoin(priv->initThread);
if (priv->udevThread)
virThreadJoin(priv->udevThread);
}
+
+ if (priv->workerPool)
+ virThreadPoolDrain(priv->workerPool);
return 0;
}
[…snip]
--
Kind regards / Beste Grüße
Marc Hartmayer
IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
On 4/19/24 11:04 AM, Marc Hartmayer wrote:
> On Fri, Apr 19, 2024 at 04:49 PM +0200, Marc Hartmayer <mhartmay@linux.ibm.com> wrote:
>> Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
>> and the initialization instead of a separate initThread and a mdevctl-thread.
>> This has the large advantage that we can leverage the job API and now this
>> thread pool is responsible to do all the "costly-work" and emitting the libvirt
>> nodedev events.
>>
>> Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
>> ---
>> src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
>> 1 file changed, 179 insertions(+), 65 deletions(-)
>>
>> diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
>> index e4b1532dc385..67a8b5cd7132 100644
>
> […snip…]
>
>> }
>>
>> @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
>> return 0;
>>
>> VIR_WITH_OBJECT_LOCK_GUARD(priv) {
>> - if (priv->initThread)
>> - virThreadJoin(priv->initThread);
>> - if (priv->udevThread)
>> - virThreadJoin(priv->udevThread);
>
>> + if (priv->mdevctlTimeout != -1) {
>> + virEventRemoveTimeout(priv->mdevctlTimeout);
>> + priv->mdevctlTimeout = -1;
>> + }
>> +
>> + if (priv->watch) {
>> + virEventRemoveHandle(priv->watch);
>> + priv->watch = -1;
>> + }
>
> Too many rebases… the diff should read as follows:
>
> @@ -2278,11 +2380,12 @@ nodeStateShutdownWait(void)
> return 0;
>
> VIR_WITH_OBJECT_LOCK_GUARD(priv) {
> - if (priv->initThread)
> - virThreadJoin(priv->initThread);
> if (priv->udevThread)
> virThreadJoin(priv->udevThread);
> }
> +
> + if (priv->workerPool)
> + virThreadPoolDrain(priv->workerPool);
> return 0;
> }
>
> […snip]
>
Reviewed-by: Jonathon Jongsma <jjongsma@redhat.com>
_______________________________________________
Devel mailing list -- devel@lists.libvirt.org
To unsubscribe send an email to devel-leave@lists.libvirt.org
© 2016 - 2026 Red Hat, Inc.