[libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe

Michal Privoznik posted 38 patches 8 years, 10 months ago
There is a newer version of this series
[libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Michal Privoznik 8 years, 10 months ago
One big downside of using the pipe to transfer the data is that
we can really transfer just bare data. No metadata can be carried
through unless some formatted messages are introduced. That would
be quite painful to achieve so let's use a message queue. It's
fairly easy to exchange info between threads now that iohelper is
no longer used.

Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
---
 src/util/virfdstream.c | 392 ++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 321 insertions(+), 71 deletions(-)

diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 4efc65d..efd9199 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -49,6 +49,27 @@
 
 VIR_LOG_INIT("fdstream");
 
+typedef enum {
+    VIR_FDSTREAM_MSG_TYPE_DATA,
+} virFDStreamMsgType;
+
+typedef struct _virFDStreamMsg virFDStreamMsg;
+typedef virFDStreamMsg *virFDStreamMsgPtr;
+struct _virFDStreamMsg {
+    virFDStreamMsgPtr next;
+
+    virFDStreamMsgType type;
+
+    union {
+        struct {
+            char *buf;
+            size_t len;
+            size_t offset;
+        } data;
+    } stream;
+};
+
+
 /* Tunnelled migration stream support */
 typedef struct virFDStreamData virFDStreamData;
 typedef virFDStreamData *virFDStreamDataPtr;
@@ -80,18 +101,25 @@ struct virFDStreamData {
 
     /* Thread data */
     virThreadPtr thread;
+    virCond threadCond;
     int threadErr;
     bool threadQuit;
+    bool threadAbort;
+    bool threadDoRead;
+    virFDStreamMsgPtr msg;
 };
 
 static virClassPtr virFDStreamDataClass;
 
+static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
+
 static void
 virFDStreamDataDispose(void *obj)
 {
     virFDStreamDataPtr fdst = obj;
 
     VIR_DEBUG("obj=%p", fdst);
+    virFDStreamMsgQueueFree(&fdst->msg);
 }
 
 static int virFDStreamDataOnceInit(void)
@@ -108,6 +136,66 @@ static int virFDStreamDataOnceInit(void)
 VIR_ONCE_GLOBAL_INIT(virFDStreamData)
 
 
+static void
+virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
+                        virFDStreamMsgPtr msg)
+{
+    virFDStreamMsgPtr *tmp = &fdst->msg;
+
+    while (*tmp)
+        tmp = &(*tmp)->next;
+
+    *tmp = msg;
+    virCondSignal(&fdst->threadCond);
+}
+
+
+static virFDStreamMsgPtr
+virFDStreamMsgQueuePop(virFDStreamDataPtr fdst)
+{
+    virFDStreamMsgPtr tmp = fdst->msg;
+
+    if (tmp) {
+        fdst->msg = tmp->next;
+        tmp->next = NULL;
+    }
+
+    virCondSignal(&fdst->threadCond);
+    return tmp;
+}
+
+
+static void
+virFDStreamMsgFree(virFDStreamMsgPtr msg)
+{
+    if (!msg)
+        return;
+
+    switch (msg->type) {
+    case VIR_FDSTREAM_MSG_TYPE_DATA:
+        VIR_FREE(msg->stream.data.buf);
+        break;
+    }
+
+    VIR_FREE(msg);
+}
+
+
+static void
+virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
+{
+    virFDStreamMsgPtr tmp = *queue;
+
+    while (tmp) {
+        virFDStreamMsgPtr next = tmp->next;
+        virFDStreamMsgFree(tmp);
+        tmp = next;
+    }
+
+    *queue = NULL;
+}
+
+
 static int virFDStreamRemoveCallback(virStreamPtr stream)
 {
     virFDStreamDataPtr fdst = stream->privateData;
@@ -289,12 +377,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
     virObjectUnref(data->st);
     VIR_FREE(data->fdinname);
     VIR_FREE(data->fdoutname);
-    VIR_FORCE_CLOSE(data->fdin);
-    VIR_FORCE_CLOSE(data->fdout);
     VIR_FREE(data);
 }
 
 
+static ssize_t
+virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+                        const int fdin,
+                        const char *fdinname,
+                        size_t buflen)
+{
+    virFDStreamMsgPtr msg = NULL;
+    char *buf = NULL;
+    ssize_t got;
+
+    if (VIR_ALLOC(msg) < 0)
+        goto error;
+
+    if (VIR_ALLOC_N(buf, buflen) < 0)
+        goto error;
+
+    if ((got = saferead(fdin, buf, buflen)) < 0) {
+        virReportSystemError(errno,
+                             _("Unable to read %s"),
+                             fdinname);
+        goto error;
+    }
+
+    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+    msg->stream.data.buf = buf;
+    msg->stream.data.len = got;
+    buf = NULL;
+
+    virFDStreamMsgQueuePush(fdst, msg);
+    msg = NULL;
+
+    return got;
+
+ error:
+    VIR_FREE(buf);
+    virFDStreamMsgFree(msg);
+    return -1;
+}
+
+
+static ssize_t
+virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+                         const int fdout,
+                         const char *fdoutname)
+{
+    ssize_t got;
+    virFDStreamMsgPtr msg = fdst->msg;
+    bool pop = false;
+
+    switch (msg->type) {
+    case VIR_FDSTREAM_MSG_TYPE_DATA:
+        got = safewrite(fdout,
+                        msg->stream.data.buf + msg->stream.data.offset,
+                        msg->stream.data.len - msg->stream.data.offset);
+        if (got < 0) {
+            virReportSystemError(errno,
+                                 _("Unable to write %s"),
+                                 fdoutname);
+            return -1;
+        }
+
+        msg->stream.data.offset += got;
+
+        pop = msg->stream.data.offset == msg->stream.data.len;
+        break;
+    }
+
+    if (pop) {
+        virFDStreamMsgQueuePop(fdst);
+        virFDStreamMsgFree(msg);
+    }
+
+    return got;
+}
+
+
 static void
 virFDStreamThread(void *opaque)
 {
@@ -306,14 +468,12 @@ virFDStreamThread(void *opaque)
     int fdout = data->fdout;
     char *fdoutname = data->fdoutname;
     virFDStreamDataPtr fdst = st->privateData;
-    char *buf = NULL;
+    bool doRead = fdst->threadDoRead;
     size_t buflen = 256 * 1024;
     size_t total = 0;
 
     virObjectRef(fdst);
-
-    if (VIR_ALLOC_N(buf, buflen) < 0)
-        goto error;
+    virObjectLock(fdst);
 
     while (1) {
         ssize_t got;
@@ -325,37 +485,49 @@ virFDStreamThread(void *opaque)
         if (buflen == 0)
             break; /* End of requested data from client */
 
-        if ((got = saferead(fdin, buf, buflen)) < 0) {
-            virReportSystemError(errno,
-                                 _("Unable to read %s"),
-                                 fdinname);
+        while (doRead == (fdst->msg != NULL) &&
+               !fdst->threadQuit) {
+            if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
+                virReportSystemError(errno, "%s",
+                                     _("failed to wait on condition"));
+                goto error;
+            }
+        }
+
+        if (fdst->threadQuit) {
+            /* If stream abort was requested, quit early. */
+            if (fdst->threadAbort)
+                goto cleanup;
+
+            /* Otherwise flush buffers and quit gracefully. */
+            if (doRead == (fdst->msg != NULL))
+                break;
+        }
+
+        if (doRead)
+            got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen);
+        else
+            got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname);
+
+        if (got < 0)
             goto error;
-        }
 
         if (got == 0)
             break;
 
         total += got;
-
-        if (safewrite(fdout, buf, got) < 0) {
-            virReportSystemError(errno,
-                                 _("Unable to write %s"),
-                                 fdoutname);
-            goto error;
-        }
     }
 
  cleanup:
+    fdst->threadQuit = true;
+    virObjectUnlock(fdst);
     if (!virObjectUnref(fdst))
         st->privateData = NULL;
     virFDStreamThreadDataFree(data);
-    VIR_FREE(buf);
     return;
 
  error:
-    virObjectLock(fdst);
     fdst->threadErr = errno;
-    virObjectUnlock(fdst);
     goto cleanup;
 }
 
@@ -367,6 +539,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
     if (!fdst->thread)
         return 0;
 
+    fdst->threadAbort = streamAbort;
+    fdst->threadQuit = true;
+    virCondSignal(&fdst->threadCond);
+
     /* Give the thread a chance to lock the FD stream object. */
     virObjectUnlock(fdst);
     virThreadJoin(fdst->thread);
@@ -380,6 +556,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
     ret = 0;
  cleanup:
     VIR_FREE(fdst->thread);
+    virCondDestroy(&fdst->threadCond);
     return ret;
 }
 
@@ -426,11 +603,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
         fdst->abortCallbackDispatching = false;
     }
 
-    /* mutex locked */
-    ret = VIR_CLOSE(fdst->fd);
     if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
         ret = -1;
 
+    /* mutex locked */
+    if ((ret = VIR_CLOSE(fdst->fd)) < 0)
+        virReportSystemError(errno, "%s",
+                             _("Unable to close"));
+
     st->privateData = NULL;
 
     /* call the internal stream closing callback */
@@ -467,7 +647,8 @@ virFDStreamAbort(virStreamPtr st)
 static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
 {
     virFDStreamDataPtr fdst = st->privateData;
-    int ret;
+    virFDStreamMsgPtr msg = NULL;
+    int ret = -1;
 
     if (nbytes > INT_MAX) {
         virReportSystemError(ERANGE, "%s",
@@ -495,25 +676,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
             nbytes = fdst->length - fdst->offset;
     }
 
- retry:
-    ret = write(fdst->fd, bytes, nbytes);
-    if (ret < 0) {
-        VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-        VIR_WARNINGS_RESET
-            ret = -2;
-        } else if (errno == EINTR) {
-            goto retry;
-        } else {
-            ret = -1;
-            virReportSystemError(errno, "%s",
+    if (fdst->thread) {
+        char *buf;
+
+        if (fdst->threadQuit) {
+            virReportSystemError(EBADF, "%s",
                                  _("cannot write to stream"));
+            return -1;
+        }
+
+        if (VIR_ALLOC(msg) < 0 ||
+            VIR_ALLOC_N(buf, nbytes) < 0)
+            goto cleanup;
+
+        memcpy(buf, bytes, nbytes);
+        msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+        msg->stream.data.buf = buf;
+        msg->stream.data.len = nbytes;
+
+        virFDStreamMsgQueuePush(fdst, msg);
+        msg = NULL;
+        ret = nbytes;
+    } else {
+     retry:
+        ret = write(fdst->fd, bytes, nbytes);
+        if (ret < 0) {
+            VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+            VIR_WARNINGS_RESET
+                ret = -2;
+            } else if (errno == EINTR) {
+                goto retry;
+            } else {
+                ret = -1;
+                virReportSystemError(errno, "%s",
+                                     _("cannot write to stream"));
+            }
         }
-    } else if (fdst->length) {
-        fdst->offset += ret;
     }
 
+    if (fdst->length)
+        fdst->offset += ret;
+
+ cleanup:
     virObjectUnlock(fdst);
+    virFDStreamMsgFree(msg);
     return ret;
 }
 
@@ -521,7 +728,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
 static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
 {
     virFDStreamDataPtr fdst = st->privateData;
-    int ret;
+    int ret = -1;
 
     if (nbytes > INT_MAX) {
         virReportSystemError(ERANGE, "%s",
@@ -547,24 +754,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
             nbytes = fdst->length - fdst->offset;
     }
 
- retry:
-    ret = read(fdst->fd, bytes, nbytes);
-    if (ret < 0) {
-        VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-        VIR_WARNINGS_RESET
-            ret = -2;
-        } else if (errno == EINTR) {
-            goto retry;
-        } else {
-            ret = -1;
-            virReportSystemError(errno, "%s",
-                                 _("cannot read from stream"));
+    if (fdst->thread) {
+        virFDStreamMsgPtr msg = NULL;
+
+        while (!(msg = fdst->msg)) {
+            if (fdst->threadQuit) {
+                if (nbytes) {
+                    virReportSystemError(EBADF, "%s",
+                                         _("stream is not open"));
+                } else {
+                    ret = 0;
+                }
+                goto cleanup;
+            } else {
+                virObjectUnlock(fdst);
+                virCondSignal(&fdst->threadCond);
+                virObjectLock(fdst);
+            }
+        }
+
+        if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
+            /* Nope, nope, I'm outta here */
+            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                           _("unexpected message type"));
+            goto cleanup;
+        }
+
+        if (nbytes > msg->stream.data.len - msg->stream.data.offset)
+            nbytes = msg->stream.data.len - msg->stream.data.offset;
+
+        memcpy(bytes,
+               msg->stream.data.buf + msg->stream.data.offset,
+               nbytes);
+
+        msg->stream.data.offset += nbytes;
+        if (msg->stream.data.offset == msg->stream.data.len) {
+            virFDStreamMsgQueuePop(fdst);
+            virFDStreamMsgFree(msg);
+        }
+
+        ret = nbytes;
+
+    } else {
+     retry:
+        ret = read(fdst->fd, bytes, nbytes);
+        if (ret < 0) {
+            VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+            VIR_WARNINGS_RESET
+                ret = -2;
+            } else if (errno == EINTR) {
+                goto retry;
+            } else {
+                ret = -1;
+                virReportSystemError(errno, "%s",
+                                     _("cannot read from stream"));
+            }
+            goto cleanup;
         }
-    } else if (fdst->length) {
-        fdst->offset += ret;
     }
 
+    if (fdst->length)
+        fdst->offset += ret;
+
+ cleanup:
     virObjectUnlock(fdst);
     return ret;
 }
@@ -593,7 +846,7 @@ static int virFDStreamOpenInternal(virStreamPtr st,
     if (virFDStreamDataInitialize() < 0)
         return -1;
 
-    if ((st->flags & VIR_STREAM_NONBLOCK) &&
+    if ((st->flags & VIR_STREAM_NONBLOCK) && !threadData &&
         virSetNonBlock(fd) < 0) {
         virReportSystemError(errno, "%s", _("Unable to set non-blocking mode"));
         return -1;
@@ -609,11 +862,20 @@ static int virFDStreamOpenInternal(virStreamPtr st,
     st->privateData = fdst;
 
     if (threadData) {
+        /* The thread is going to do reads if fdin is set and fdout is not. */
+        fdst->threadDoRead = threadData->fdout == -1;
+
         /* Create the thread after fdst and st were initialized.
          * The thread worker expects them to be that way. */
         if (VIR_ALLOC(fdst->thread) < 0)
             goto error;
 
+        if (virCondInit(&fdst->threadCond) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("cannot initialize condition variable"));
+            goto error;
+        }
+
         if (virThreadCreate(fdst->thread,
                             true,
                             virFDStreamThread,
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
     if ((st->flags & VIR_STREAM_NONBLOCK) &&
         ((!S_ISCHR(sb.st_mode) &&
           !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
-        int fds[2] = { -1, -1 };
-
         if ((oflags & O_ACCMODE) == O_RDWR) {
             virReportError(VIR_ERR_INTERNAL_ERROR,
                            _("%s: Cannot request read and write flags together"),
@@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
             goto error;
         }
 
-        if (pipe(fds) < 0) {
-            virReportSystemError(errno, "%s",
-                                 _("Unable to create pipe"));
-            goto error;
-        }
-
         if (VIR_ALLOC(threadData) < 0)
             goto error;
 
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
 
         if ((oflags & O_ACCMODE) == O_RDONLY) {
             threadData->fdin = fd;
-            threadData->fdout = fds[1];
-            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
-                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
+            threadData->fdout = -1;
+            if (VIR_STRDUP(threadData->fdinname, path) < 0)
                 goto error;
-            fd = fds[0];
         } else {
-            threadData->fdin = fds[0];
+            threadData->fdin = -1;
             threadData->fdout = fd;
-            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
-                VIR_STRDUP(threadData->fdoutname, path) < 0)
+            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
                 goto error;
-            fd = fds[1];
         }
     }
 
-- 
2.10.2

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Daniel P. Berrange 8 years, 10 months ago
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.

I'm not seeing how this works correctly with the event loop.

> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>          ((!S_ISCHR(sb.st_mode) &&
>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
> -        int fds[2] = { -1, -1 };
> -
>          if ((oflags & O_ACCMODE) == O_RDWR) {
>              virReportError(VIR_ERR_INTERNAL_ERROR,
>                             _("%s: Cannot request read and write flags together"),
> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>              goto error;
>          }
>  
> -        if (pipe(fds) < 0) {
> -            virReportSystemError(errno, "%s",
> -                                 _("Unable to create pipe"));
> -            goto error;
> -        }

Here we previously created the pipe....

> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>  
>          if ((oflags & O_ACCMODE) == O_RDONLY) {
>              threadData->fdin = fd;
> -            threadData->fdout = fds[1];
> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> +            threadData->fdout = -1;
> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
>                  goto error;
> -            fd = fds[0];

And here we set 'fd' to be the pipe

>          } else {
> -            threadData->fdin = fds[0];
> +            threadData->fdin = -1;
>              threadData->fdout = fd;
> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
>                  goto error;
> -            fd = fds[1];

Likewise here

>          }
>      }

...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
that the event loop watches are registered against by virFDStreamAddCallback


With this change 'fd' is the actual plain file the thread is reading to/from,
so the callbacks are being registered against the plain file, not the pipe.

poll/select on POSIX always reports plain files as readable/writable even
when they would block.  So with this change we're just going to busy loop
in the main event thread even when we'll block, which defeats the whole
purpose of having a iohelper and/or thread.


Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Michal Privoznik 8 years, 10 months ago
On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
> On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
>> One big downside of using the pipe to transfer the data is that
>> we can really transfer just bare data. No metadata can be carried
>> through unless some formatted messages are introduced. That would
>> be quite painful to achieve so let's use a message queue. It's
>> fairly easy to exchange info between threads now that iohelper is
>> no longer used.
> 
> I'm not seeing how this works correctly with the event loop.
> 
>> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>>          ((!S_ISCHR(sb.st_mode) &&
>>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
>> -        int fds[2] = { -1, -1 };
>> -
>>          if ((oflags & O_ACCMODE) == O_RDWR) {
>>              virReportError(VIR_ERR_INTERNAL_ERROR,
>>                             _("%s: Cannot request read and write flags together"),
>> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>              goto error;
>>          }
>>  
>> -        if (pipe(fds) < 0) {
>> -            virReportSystemError(errno, "%s",
>> -                                 _("Unable to create pipe"));
>> -            goto error;
>> -        }
> 
> Here we previously created the pipe....
> 
>> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>  
>>          if ((oflags & O_ACCMODE) == O_RDONLY) {
>>              threadData->fdin = fd;
>> -            threadData->fdout = fds[1];
>> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
>> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
>> +            threadData->fdout = -1;
>> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
>>                  goto error;
>> -            fd = fds[0];
> 
> And here we set 'fd' to be the pipe
> 
>>          } else {
>> -            threadData->fdin = fds[0];
>> +            threadData->fdin = -1;
>>              threadData->fdout = fd;
>> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
>> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
>> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
>>                  goto error;
>> -            fd = fds[1];
> 
> Likewise here
> 
>>          }
>>      }
> 
> ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
> that the event loop watches are registered against by virFDStreamAddCallback
> 
> 
> With this change 'fd' is the actual plain file the thread is reading to/from,
> so the callbacks are being registered against the plain file, not the pipe.
> 
> poll/select on POSIX always reports plain files as readable/writable even
> when they would block.  So with this change we're just going to busy loop
> in the main event thread even when we'll block, which defeats the whole
> purpose of having a iohelper and/or thread.

Oh, I've misunderstood what we've discussed on IRC then. The way I've
understood it was that if an FD is set to nonblock mode and poll()
claims there are some data available, subsequent read() might block. If
that was the case we would be safe with this code. However, I didn't
expect poll() to lie.

Any link for further reading on this? I guess it's not only us who has
to deal with this problem. Basically any application with poll() and
disk read()/write() has to suffer from this.

So what are our options here? Because I don't see any right now.

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Daniel P. Berrange 8 years, 10 months ago
On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
> On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
> > On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
> >> One big downside of using the pipe to transfer the data is that
> >> we can really transfer just bare data. No metadata can be carried
> >> through unless some formatted messages are introduced. That would
> >> be quite painful to achieve so let's use a message queue. It's
> >> fairly easy to exchange info between threads now that iohelper is
> >> no longer used.
> > 
> > I'm not seeing how this works correctly with the event loop.
> > 
> >> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> >>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
> >>          ((!S_ISCHR(sb.st_mode) &&
> >>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
> >> -        int fds[2] = { -1, -1 };
> >> -
> >>          if ((oflags & O_ACCMODE) == O_RDWR) {
> >>              virReportError(VIR_ERR_INTERNAL_ERROR,
> >>                             _("%s: Cannot request read and write flags together"),
> >> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> >>              goto error;
> >>          }
> >>  
> >> -        if (pipe(fds) < 0) {
> >> -            virReportSystemError(errno, "%s",
> >> -                                 _("Unable to create pipe"));
> >> -            goto error;
> >> -        }
> > 
> > Here we previously created the pipe....
> > 
> >> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> >>  
> >>          if ((oflags & O_ACCMODE) == O_RDONLY) {
> >>              threadData->fdin = fd;
> >> -            threadData->fdout = fds[1];
> >> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
> >> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> >> +            threadData->fdout = -1;
> >> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
> >>                  goto error;
> >> -            fd = fds[0];
> > 
> > And here we set 'fd' to be the pipe
> > 
> >>          } else {
> >> -            threadData->fdin = fds[0];
> >> +            threadData->fdin = -1;
> >>              threadData->fdout = fd;
> >> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
> >> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
> >> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
> >>                  goto error;
> >> -            fd = fds[1];
> > 
> > Likewise here
> > 
> >>          }
> >>      }
> > 
> > ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
> > that the event loop watches are registered against by virFDStreamAddCallback
> > 
> > 
> > With this change 'fd' is the actual plain file the thread is reading to/from,
> > so the callbacks are being registered against the plain file, not the pipe.
> > 
> > poll/select on POSIX always reports plain files as readable/writable even
> > when they would block.  So with this change we're just going to busy loop
> > in the main event thread even when we'll block, which defeats the whole
> > purpose of having a iohelper and/or thread.
> 
> Oh, I've misunderstood what we've discussed on IRC then. The way I've
> understood it was that if an FD is set to nonblock mode and poll()
> claims there are some data available, subsequent read() might block. If
> that was the case we would be safe with this code. However, I didn't
> expect poll() to lie.

This code wouldn't be safe - anytime poll claims data available, we *must*
be able to read without blocking.

> Any link for further reading on this? I guess it's not only us who has
> to deal with this problem. Basically any application with poll() and
> disk read()/write() has to suffer from this.

Yes, that's correct - QEMU has the same issue for example - it is why there
is no 'file:' protocol for migration for example - it would block the QEMU
main loop.

> So what are our options here? Because I don't see any right now.

IIUC, you didn't want to use a pipe because you want to send structured
messages, not just plain data. If we just have a linked list of messages
there's nothing we can poll on, so we need to keep the pipe in use, but
find a way to get the special messages in the flow.

I think we could do a trick where we have two pipes in use, one for
monitoring the readability, and one for monitoring writability.


When the I/O thread has data on the queue ready for read by the main
thread, it can write a single byte to the read-monitor pipe.

When the I/O thread is ready to accept more data to write from the
main thread, it can write a single byte to the write-monitor pipe.

The main thread would monitor for POLLIN condition on both the
read-monitor pipe and write-monitor pipe.

BTW, we also need to make sure the I/O thread doesn't proactively
queue too much data on the message queue when reading it, in case
the main thread is being slow at consuming this read data and
sending it to the TCP client.

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Michal Privoznik 8 years, 9 months ago
On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
> On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
>> On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
>>> On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
>>>> One big downside of using the pipe to transfer the data is that
>>>> we can really transfer just bare data. No metadata can be carried
>>>> through unless some formatted messages are introduced. That would
>>>> be quite painful to achieve so let's use a message queue. It's
>>>> fairly easy to exchange info between threads now that iohelper is
>>>> no longer used.
>>>
>>> I'm not seeing how this works correctly with the event loop.
>>>
>>>> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>>>>          ((!S_ISCHR(sb.st_mode) &&
>>>>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
>>>> -        int fds[2] = { -1, -1 };
>>>> -
>>>>          if ((oflags & O_ACCMODE) == O_RDWR) {
>>>>              virReportError(VIR_ERR_INTERNAL_ERROR,
>>>>                             _("%s: Cannot request read and write flags together"),
>>>> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>              goto error;
>>>>          }
>>>>
>>>> -        if (pipe(fds) < 0) {
>>>> -            virReportSystemError(errno, "%s",
>>>> -                                 _("Unable to create pipe"));
>>>> -            goto error;
>>>> -        }
>>>
>>> Here we previously created the pipe....
>>>
>>>> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>
>>>>          if ((oflags & O_ACCMODE) == O_RDONLY) {
>>>>              threadData->fdin = fd;
>>>> -            threadData->fdout = fds[1];
>>>> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
>>>> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
>>>> +            threadData->fdout = -1;
>>>> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
>>>>                  goto error;
>>>> -            fd = fds[0];
>>>
>>> And here we set 'fd' to be the pipe
>>>
>>>>          } else {
>>>> -            threadData->fdin = fds[0];
>>>> +            threadData->fdin = -1;
>>>>              threadData->fdout = fd;
>>>> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
>>>> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>>                  goto error;
>>>> -            fd = fds[1];
>>>
>>> Likewise here
>>>
>>>>          }
>>>>      }
>>>
>>> ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
>>> that the event loop watches are registered against by virFDStreamAddCallback
>>>
>>>
>>> With this change 'fd' is the actual plain file the thread is reading to/from,
>>> so the callbacks are being registered against the plain file, not the pipe.
>>>
>>> poll/select on POSIX always reports plain files as readable/writable even
>>> when they would block.  So with this change we're just going to busy loop
>>> in the main event thread even when we'll block, which defeats the whole
>>> purpose of having a iohelper and/or thread.
>>
>> Oh, I've misunderstood what we've discussed on IRC then. The way I've
>> understood it was that if an FD is set to nonblock mode and poll()
>> claims there are some data available, subsequent read() might block. If
>> that was the case we would be safe with this code. However, I didn't
>> expect poll() to lie.
>
> This code wouldn't be safe - anytime poll claims data available, we *must*
> be able to read without blocking.
>
>> Any link for further reading on this? I guess it's not only us who has
>> to deal with this problem. Basically any application with poll() and
>> disk read()/write() has to suffer from this.
>
> Yes, that's correct - QEMU has the same issue for example - it is why there
> is no 'file:' protocol for migration for example - it would block the QEMU
> main loop.
>
>> So what are our options here? Because I don't see any right now.
>
> IIUC, you didn't want to use a pipe because you want to send structured
> messages, not just plain data. If we just have a linked list of messages
> there's nothing we can poll on, so we need to keep the pipe in use, but
> find a way to get the special messages in the flow.
>
> I think we could do a trick where we have two pipes in use, one for
> monitoring the readability, and one for monitoring writability.
>
>
> When the I/O thread has data on the queue ready for read by the main
> thread, it can write a single byte to the read-monitor pipe.
>
> When the I/O thread is ready to accept more data to write from the
> main thread, it can write a single byte to the write-monitor pipe.
>
> The main thread would monitor for POLLIN condition on both the
> read-monitor pipe and write-monitor pipe.

Ah, indeed. This could work. But I also thought over different approach. 
What I need really is transfer "you're in a data/hole X bytes long" 
besides actual data. So I can use pipe for transferring the data as is 
currently, and store the metadata into a structured message that would 
the thread write/read and event loop read/write.

>
> BTW, we also need to make sure the I/O thread doesn't proactively
> queue too much data on the message queue when reading it, in case
> the main thread is being slow at consuming this read data and
> sending it to the TCP client.

Sure. Currently, with this implementation there's always one message 
with 4MiB buffer in the queue. Even though it's prepared for a queue of 
messages, there is no more than 1 message in the queue really.

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Daniel P. Berrange 8 years, 9 months ago
On Tue, Apr 18, 2017 at 02:00:09PM +0200, Michal Privoznik wrote:
> On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
> > On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
> > > On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
> > > > On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
> > > > > One big downside of using the pipe to transfer the data is that
> > > > > we can really transfer just bare data. No metadata can be carried
> > > > > through unless some formatted messages are introduced. That would
> > > > > be quite painful to achieve so let's use a message queue. It's
> > > > > fairly easy to exchange info between threads now that iohelper is
> > > > > no longer used.
> > > > 
> > > > I'm not seeing how this works correctly with the event loop.
> > > > 
> > > > > @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> > > > >      if ((st->flags & VIR_STREAM_NONBLOCK) &&
> > > > >          ((!S_ISCHR(sb.st_mode) &&
> > > > >            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
> > > > > -        int fds[2] = { -1, -1 };
> > > > > -
> > > > >          if ((oflags & O_ACCMODE) == O_RDWR) {
> > > > >              virReportError(VIR_ERR_INTERNAL_ERROR,
> > > > >                             _("%s: Cannot request read and write flags together"),
> > > > > @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> > > > >              goto error;
> > > > >          }
> > > > > 
> > > > > -        if (pipe(fds) < 0) {
> > > > > -            virReportSystemError(errno, "%s",
> > > > > -                                 _("Unable to create pipe"));
> > > > > -            goto error;
> > > > > -        }
> > > > 
> > > > Here we previously created the pipe....
> > > > 
> > > > > @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> > > > > 
> > > > >          if ((oflags & O_ACCMODE) == O_RDONLY) {
> > > > >              threadData->fdin = fd;
> > > > > -            threadData->fdout = fds[1];
> > > > > -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
> > > > > -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> > > > > +            threadData->fdout = -1;
> > > > > +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
> > > > >                  goto error;
> > > > > -            fd = fds[0];
> > > > 
> > > > And here we set 'fd' to be the pipe
> > > > 
> > > > >          } else {
> > > > > -            threadData->fdin = fds[0];
> > > > > +            threadData->fdin = -1;
> > > > >              threadData->fdout = fd;
> > > > > -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
> > > > > -                VIR_STRDUP(threadData->fdoutname, path) < 0)
> > > > > +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
> > > > >                  goto error;
> > > > > -            fd = fds[1];
> > > > 
> > > > Likewise here
> > > > 
> > > > >          }
> > > > >      }
> > > > 
> > > > ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
> > > > that the event loop watches are registered against by virFDStreamAddCallback
> > > > 
> > > > 
> > > > With this change 'fd' is the actual plain file the thread is reading to/from,
> > > > so the callbacks are being registered against the plain file, not the pipe.
> > > > 
> > > > poll/select on POSIX always reports plain files as readable/writable even
> > > > when they would block.  So with this change we're just going to busy loop
> > > > in the main event thread even when we'll block, which defeats the whole
> > > > purpose of having a iohelper and/or thread.
> > > 
> > > Oh, I've misunderstood what we've discussed on IRC then. The way I've
> > > understood it was that if an FD is set to nonblock mode and poll()
> > > claims there are some data available, subsequent read() might block. If
> > > that was the case we would be safe with this code. However, I didn't
> > > expect poll() to lie.
> > 
> > This code wouldn't be safe - anytime poll claims data available, we *must*
> > be able to read without blocking.
> > 
> > > Any link for further reading on this? I guess it's not only us who has
> > > to deal with this problem. Basically any application with poll() and
> > > disk read()/write() has to suffer from this.
> > 
> > Yes, that's correct - QEMU has the same issue for example - it is why there
> > is no 'file:' protocol for migration for example - it would block the QEMU
> > main loop.
> > 
> > > So what are our options here? Because I don't see any right now.
> > 
> > IIUC, you didn't want to use a pipe because you want to send structured
> > messages, not just plain data. If we just have a linked list of messages
> > there's nothing we can poll on, so we need to keep the pipe in use, but
> > find a way to get the special messages in the flow.
> > 
> > I think we could do a trick where we have two pipes in use, one for
> > monitoring the readability, and one for monitoring writability.
> > 
> > 
> > When the I/O thread has data on the queue ready for read by the main
> > thread, it can write a single byte to the read-monitor pipe.
> > 
> > When the I/O thread is ready to accept more data to write from the
> > main thread, it can write a single byte to the write-monitor pipe.
> > 
> > The main thread would monitor for POLLIN condition on both the
> > read-monitor pipe and write-monitor pipe.
> 
> Ah, indeed. This could work. But I also thought over different approach.
> What I need really is transfer "you're in a data/hole X bytes long" besides
> actual data. So I can use pipe for transferring the data as is currently,
> and store the metadata into a structured message that would the thread
> write/read and event loop read/write.

Sure, that works too. Just depends how much you care about optimizing
performance - avoiding the pipe removes the data copies between kerenl
and userspace and back again, which could improve throughput.

> > BTW, we also need to make sure the I/O thread doesn't proactively
> > queue too much data on the message queue when reading it, in case
> > the main thread is being slow at consuming this read data and
> > sending it to the TCP client.
> 
> Sure. Currently, with this implementation there's always one message with
> 4MiB buffer in the queue. Even though it's prepared for a queue of messages,
> there is no more than 1 message in the queue really.

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe
Posted by Michal Privoznik 8 years, 9 months ago
On 04/18/2017 02:03 PM, Daniel P. Berrange wrote:
> On Tue, Apr 18, 2017 at 02:00:09PM +0200, Michal Privoznik wrote:
>> On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
>>> On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
>>>> On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
>>>>> On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
>>>>>> One big downside of using the pipe to transfer the data is that
>>>>>> we can really transfer just bare data. No metadata can be carried
>>>>>> through unless some formatted messages are introduced. That would
>>>>>> be quite painful to achieve so let's use a message queue. It's
>>>>>> fairly easy to exchange info between threads now that iohelper is
>>>>>> no longer used.
>>>>>
>>>>> I'm not seeing how this works correctly with the event loop.
>>>>>
>>>>>> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>>>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>>>>>>          ((!S_ISCHR(sb.st_mode) &&
>>>>>>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
>>>>>> -        int fds[2] = { -1, -1 };
>>>>>> -
>>>>>>          if ((oflags & O_ACCMODE) == O_RDWR) {
>>>>>>              virReportError(VIR_ERR_INTERNAL_ERROR,
>>>>>>                             _("%s: Cannot request read and write flags together"),
>>>>>> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>>>              goto error;
>>>>>>          }
>>>>>>
>>>>>> -        if (pipe(fds) < 0) {
>>>>>> -            virReportSystemError(errno, "%s",
>>>>>> -                                 _("Unable to create pipe"));
>>>>>> -            goto error;
>>>>>> -        }
>>>>>
>>>>> Here we previously created the pipe....
>>>>>
>>>>>> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>>>
>>>>>>          if ((oflags & O_ACCMODE) == O_RDONLY) {
>>>>>>              threadData->fdin = fd;
>>>>>> -            threadData->fdout = fds[1];
>>>>>> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
>>>>>> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
>>>>>> +            threadData->fdout = -1;
>>>>>> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
>>>>>>                  goto error;
>>>>>> -            fd = fds[0];
>>>>>
>>>>> And here we set 'fd' to be the pipe
>>>>>
>>>>>>          } else {
>>>>>> -            threadData->fdin = fds[0];
>>>>>> +            threadData->fdin = -1;
>>>>>>              threadData->fdout = fd;
>>>>>> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
>>>>>> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>>>> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>>>>                  goto error;
>>>>>> -            fd = fds[1];
>>>>>
>>>>> Likewise here
>>>>>
>>>>>>          }
>>>>>>      }
>>>>>
>>>>> ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
>>>>> that the event loop watches are registered against by virFDStreamAddCallback
>>>>>
>>>>>
>>>>> With this change 'fd' is the actual plain file the thread is reading to/from,
>>>>> so the callbacks are being registered against the plain file, not the pipe.
>>>>>
>>>>> poll/select on POSIX always reports plain files as readable/writable even
>>>>> when they would block.  So with this change we're just going to busy loop
>>>>> in the main event thread even when we'll block, which defeats the whole
>>>>> purpose of having a iohelper and/or thread.
>>>>
>>>> Oh, I've misunderstood what we've discussed on IRC then. The way I've
>>>> understood it was that if an FD is set to nonblock mode and poll()
>>>> claims there are some data available, subsequent read() might block. If
>>>> that was the case we would be safe with this code. However, I didn't
>>>> expect poll() to lie.
>>>
>>> This code wouldn't be safe - anytime poll claims data available, we *must*
>>> be able to read without blocking.
>>>
>>>> Any link for further reading on this? I guess it's not only us who has
>>>> to deal with this problem. Basically any application with poll() and
>>>> disk read()/write() has to suffer from this.
>>>
>>> Yes, that's correct - QEMU has the same issue for example - it is why there
>>> is no 'file:' protocol for migration for example - it would block the QEMU
>>> main loop.
>>>
>>>> So what are our options here? Because I don't see any right now.
>>>
>>> IIUC, you didn't want to use a pipe because you want to send structured
>>> messages, not just plain data. If we just have a linked list of messages
>>> there's nothing we can poll on, so we need to keep the pipe in use, but
>>> find a way to get the special messages in the flow.
>>>
>>> I think we could do a trick where we have two pipes in use, one for
>>> monitoring the readability, and one for monitoring writability.
>>>
>>>
>>> When the I/O thread has data on the queue ready for read by the main
>>> thread, it can write a single byte to the read-monitor pipe.
>>>
>>> When the I/O thread is ready to accept more data to write from the
>>> main thread, it can write a single byte to the write-monitor pipe.
>>>
>>> The main thread would monitor for POLLIN condition on both the
>>> read-monitor pipe and write-monitor pipe.
>>
>> Ah, indeed. This could work. But I also thought over different approach.
>> What I need really is transfer "you're in a data/hole X bytes long" besides
>> actual data. So I can use pipe for transferring the data as is currently,
>> and store the metadata into a structured message that would the thread
>> write/read and event loop read/write.
>
> Sure, that works too. Just depends how much you care about optimizing
> performance - avoiding the pipe removes the data copies between kerenl
> and userspace and back again, which could improve throughput.

Good point. So let me respin my patches with your approach implemented.

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list