From nobody Sun Feb 8 18:43:47 2026 Delivered-To: importer@patchew.org Received-SPF: pass (zoho.com: domain of redhat.com designates 209.132.183.28 as permitted sender) client-ip=209.132.183.28; envelope-from=libvir-list-bounces@redhat.com; helo=mx1.redhat.com; Authentication-Results: mx.zoho.com; spf=pass (zoho.com: domain of redhat.com designates 209.132.183.28 as permitted sender) smtp.mailfrom=libvir-list-bounces@redhat.com; Return-Path: Received: from mx1.redhat.com (mx1.redhat.com [209.132.183.28]) by mx.zohomail.com with SMTPS id 14920903283961005.7881190675498; Thu, 13 Apr 2017 06:32:08 -0700 (PDT) Received: from smtp.corp.redhat.com (int-mx02.intmail.prod.int.phx2.redhat.com [10.5.11.12]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id EE9067F7A9; Thu, 13 Apr 2017 13:32:06 +0000 (UTC) Received: from colo-mx.corp.redhat.com (unknown [10.5.11.21]) by smtp.corp.redhat.com (Postfix) with ESMTPS id BF2D07FB95; Thu, 13 Apr 2017 13:32:06 +0000 (UTC) Received: from lists01.pubmisc.prod.ext.phx2.redhat.com (lists01.pubmisc.prod.ext.phx2.redhat.com [10.5.19.33]) by colo-mx.corp.redhat.com (Postfix) with ESMTP id 6F4745EC67; Thu, 13 Apr 2017 13:32:06 +0000 (UTC) Received: from smtp.corp.redhat.com (int-mx05.intmail.prod.int.phx2.redhat.com [10.5.11.15]) by lists01.pubmisc.prod.ext.phx2.redhat.com (8.13.8/8.13.8) with ESMTP id v3DDVusC012009 for ; Thu, 13 Apr 2017 09:31:56 -0400 Received: by smtp.corp.redhat.com (Postfix) id DF1B77A428; Thu, 13 Apr 2017 13:31:56 +0000 (UTC) Received: from moe.brq.redhat.com (dhcp129-131.brq.redhat.com [10.34.129.131]) by smtp.corp.redhat.com (Postfix) with ESMTP id 414737A41E for ; Thu, 13 Apr 2017 13:31:56 +0000 (UTC) DMARC-Filter: OpenDMARC Filter v1.3.2 mx1.redhat.com EE9067F7A9 Authentication-Results: ext-mx04.extmail.prod.ext.phx2.redhat.com; dmarc=none (p=none dis=none) header.from=redhat.com Authentication-Results: ext-mx04.extmail.prod.ext.phx2.redhat.com; spf=pass smtp.mailfrom=libvir-list-bounces@redhat.com DKIM-Filter: OpenDKIM Filter v2.11.0 mx1.redhat.com EE9067F7A9 From: Michal Privoznik To: libvir-list@redhat.com Date: Thu, 13 Apr 2017 15:31:14 +0200 Message-Id: <212752bbb3d49a8f3f4f1a43a1165218446a01cc.1492089792.git.mprivozn@redhat.com> In-Reply-To: References: In-Reply-To: References: X-Scanned-By: MIMEDefang 2.79 on 10.5.11.15 X-loop: libvir-list@redhat.com Subject: [libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe X-BeenThere: libvir-list@redhat.com X-Mailman-Version: 2.1.12 Precedence: junk List-Id: Development discussions about the libvirt library & tools List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Sender: libvir-list-bounces@redhat.com Errors-To: libvir-list-bounces@redhat.com X-Scanned-By: MIMEDefang 2.79 on 10.5.11.12 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.28]); Thu, 13 Apr 2017 13:32:07 +0000 (UTC) X-ZohoMail: RSF_0 Z_629925259 SPT_0 Content-Type: text/plain; charset="utf-8" One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used. Signed-off-by: Michal Privoznik --- src/util/virfdstream.c | 392 ++++++++++++++++++++++++++++++++++++++++-----= ---- 1 file changed, 321 insertions(+), 71 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 4efc65d..efd9199 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -49,6 +49,27 @@ =20 VIR_LOG_INIT("fdstream"); =20 +typedef enum { + VIR_FDSTREAM_MSG_TYPE_DATA, +} virFDStreamMsgType; + +typedef struct _virFDStreamMsg virFDStreamMsg; +typedef virFDStreamMsg *virFDStreamMsgPtr; +struct _virFDStreamMsg { + virFDStreamMsgPtr next; + + virFDStreamMsgType type; + + union { + struct { + char *buf; + size_t len; + size_t offset; + } data; + } stream; +}; + + /* Tunnelled migration stream support */ typedef struct virFDStreamData virFDStreamData; typedef virFDStreamData *virFDStreamDataPtr; @@ -80,18 +101,25 @@ struct virFDStreamData { =20 /* Thread data */ virThreadPtr thread; + virCond threadCond; int threadErr; bool threadQuit; + bool threadAbort; + bool threadDoRead; + virFDStreamMsgPtr msg; }; =20 static virClassPtr virFDStreamDataClass; =20 +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue); + static void virFDStreamDataDispose(void *obj) { virFDStreamDataPtr fdst =3D obj; =20 VIR_DEBUG("obj=3D%p", fdst); + virFDStreamMsgQueueFree(&fdst->msg); } =20 static int virFDStreamDataOnceInit(void) @@ -108,6 +136,66 @@ static int virFDStreamDataOnceInit(void) VIR_ONCE_GLOBAL_INIT(virFDStreamData) =20 =20 +static void +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, + virFDStreamMsgPtr msg) +{ + virFDStreamMsgPtr *tmp =3D &fdst->msg; + + while (*tmp) + tmp =3D &(*tmp)->next; + + *tmp =3D msg; + virCondSignal(&fdst->threadCond); +} + + +static virFDStreamMsgPtr +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst) +{ + virFDStreamMsgPtr tmp =3D fdst->msg; + + if (tmp) { + fdst->msg =3D tmp->next; + tmp->next =3D NULL; + } + + virCondSignal(&fdst->threadCond); + return tmp; +} + + +static void +virFDStreamMsgFree(virFDStreamMsgPtr msg) +{ + if (!msg) + return; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + VIR_FREE(msg->stream.data.buf); + break; + } + + VIR_FREE(msg); +} + + +static void +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue) +{ + virFDStreamMsgPtr tmp =3D *queue; + + while (tmp) { + virFDStreamMsgPtr next =3D tmp->next; + virFDStreamMsgFree(tmp); + tmp =3D next; + } + + *queue =3D NULL; +} + + static int virFDStreamRemoveCallback(virStreamPtr stream) { virFDStreamDataPtr fdst =3D stream->privateData; @@ -289,12 +377,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr da= ta) virObjectUnref(data->st); VIR_FREE(data->fdinname); VIR_FREE(data->fdoutname); - VIR_FORCE_CLOSE(data->fdin); - VIR_FORCE_CLOSE(data->fdout); VIR_FREE(data); } =20 =20 +static ssize_t +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + const int fdin, + const char *fdinname, + size_t buflen) +{ + virFDStreamMsgPtr msg =3D NULL; + char *buf =3D NULL; + ssize_t got; + + if (VIR_ALLOC(msg) < 0) + goto error; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got =3D saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type =3D VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf =3D buf; + msg->stream.data.len =3D got; + buf =3D NULL; + + virFDStreamMsgQueuePush(fdst, msg); + msg =3D NULL; + + return got; + + error: + VIR_FREE(buf); + virFDStreamMsgFree(msg); + return -1; +} + + +static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdout, + const char *fdoutname) +{ + ssize_t got; + virFDStreamMsgPtr msg =3D fdst->msg; + bool pop =3D false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got =3D safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset +=3D got; + + pop =3D msg->stream.data.offset =3D=3D msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst); + virFDStreamMsgFree(msg); + } + + return got; +} + + static void virFDStreamThread(void *opaque) { @@ -306,14 +468,12 @@ virFDStreamThread(void *opaque) int fdout =3D data->fdout; char *fdoutname =3D data->fdoutname; virFDStreamDataPtr fdst =3D st->privateData; - char *buf =3D NULL; + bool doRead =3D fdst->threadDoRead; size_t buflen =3D 256 * 1024; size_t total =3D 0; =20 virObjectRef(fdst); - - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + virObjectLock(fdst); =20 while (1) { ssize_t got; @@ -325,37 +485,49 @@ virFDStreamThread(void *opaque) if (buflen =3D=3D 0) break; /* End of requested data from client */ =20 - if ((got =3D saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); + while (doRead =3D=3D (fdst->msg !=3D NULL) && + !fdst->threadQuit) { + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { + virReportSystemError(errno, "%s", + _("failed to wait on condition")); + goto error; + } + } + + if (fdst->threadQuit) { + /* If stream abort was requested, quit early. */ + if (fdst->threadAbort) + goto cleanup; + + /* Otherwise flush buffers and quit gracefully. */ + if (doRead =3D=3D (fdst->msg !=3D NULL)) + break; + } + + if (doRead) + got =3D virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen); + else + got =3D virFDStreamThreadDoWrite(fdst, fdout, fdoutname); + + if (got < 0) goto error; - } =20 if (got =3D=3D 0) break; =20 total +=3D got; - - if (safewrite(fdout, buf, got) < 0) { - virReportSystemError(errno, - _("Unable to write %s"), - fdoutname); - goto error; - } } =20 cleanup: + fdst->threadQuit =3D true; + virObjectUnlock(fdst); if (!virObjectUnref(fdst)) st->privateData =3D NULL; virFDStreamThreadDataFree(data); - VIR_FREE(buf); return; =20 error: - virObjectLock(fdst); fdst->threadErr =3D errno; - virObjectUnlock(fdst); goto cleanup; } =20 @@ -367,6 +539,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool st= reamAbort) if (!fdst->thread) return 0; =20 + fdst->threadAbort =3D streamAbort; + fdst->threadQuit =3D true; + virCondSignal(&fdst->threadCond); + /* Give the thread a chance to lock the FD stream object. */ virObjectUnlock(fdst); virThreadJoin(fdst->thread); @@ -380,6 +556,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool str= eamAbort) ret =3D 0; cleanup: VIR_FREE(fdst->thread); + virCondDestroy(&fdst->threadCond); return ret; } =20 @@ -426,11 +603,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) fdst->abortCallbackDispatching =3D false; } =20 - /* mutex locked */ - ret =3D VIR_CLOSE(fdst->fd); if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret =3D -1; =20 + /* mutex locked */ + if ((ret =3D VIR_CLOSE(fdst->fd)) < 0) + virReportSystemError(errno, "%s", + _("Unable to close")); + st->privateData =3D NULL; =20 /* call the internal stream closing callback */ @@ -467,7 +647,8 @@ virFDStreamAbort(virStreamPtr st) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nby= tes) { virFDStreamDataPtr fdst =3D st->privateData; - int ret; + virFDStreamMsgPtr msg =3D NULL; + int ret =3D -1; =20 if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -495,25 +676,51 @@ static int virFDStreamWrite(virStreamPtr st, const ch= ar *bytes, size_t nbytes) nbytes =3D fdst->length - fdst->offset; } =20 - retry: - ret =3D write(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno =3D=3D EAGAIN || errno =3D=3D EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret =3D -2; - } else if (errno =3D=3D EINTR) { - goto retry; - } else { - ret =3D -1; - virReportSystemError(errno, "%s", + if (fdst->thread) { + char *buf; + + if (fdst->threadQuit) { + virReportSystemError(EBADF, "%s", _("cannot write to stream")); + return -1; + } + + if (VIR_ALLOC(msg) < 0 || + VIR_ALLOC_N(buf, nbytes) < 0) + goto cleanup; + + memcpy(buf, bytes, nbytes); + msg->type =3D VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf =3D buf; + msg->stream.data.len =3D nbytes; + + virFDStreamMsgQueuePush(fdst, msg); + msg =3D NULL; + ret =3D nbytes; + } else { + retry: + ret =3D write(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno =3D=3D EAGAIN || errno =3D=3D EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret =3D -2; + } else if (errno =3D=3D EINTR) { + goto retry; + } else { + ret =3D -1; + virReportSystemError(errno, "%s", + _("cannot write to stream")); + } } - } else if (fdst->length) { - fdst->offset +=3D ret; } =20 + if (fdst->length) + fdst->offset +=3D ret; + + cleanup: virObjectUnlock(fdst); + virFDStreamMsgFree(msg); return ret; } =20 @@ -521,7 +728,7 @@ static int virFDStreamWrite(virStreamPtr st, const char= *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { virFDStreamDataPtr fdst =3D st->privateData; - int ret; + int ret =3D -1; =20 if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -547,24 +754,70 @@ static int virFDStreamRead(virStreamPtr st, char *byt= es, size_t nbytes) nbytes =3D fdst->length - fdst->offset; } =20 - retry: - ret =3D read(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno =3D=3D EAGAIN || errno =3D=3D EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret =3D -2; - } else if (errno =3D=3D EINTR) { - goto retry; - } else { - ret =3D -1; - virReportSystemError(errno, "%s", - _("cannot read from stream")); + if (fdst->thread) { + virFDStreamMsgPtr msg =3D NULL; + + while (!(msg =3D fdst->msg)) { + if (fdst->threadQuit) { + if (nbytes) { + virReportSystemError(EBADF, "%s", + _("stream is not open")); + } else { + ret =3D 0; + } + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type !=3D VIR_FDSTREAM_MSG_TYPE_DATA) { + /* Nope, nope, I'm outta here */ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected message type")); + goto cleanup; + } + + if (nbytes > msg->stream.data.len - msg->stream.data.offset) + nbytes =3D msg->stream.data.len - msg->stream.data.offset; + + memcpy(bytes, + msg->stream.data.buf + msg->stream.data.offset, + nbytes); + + msg->stream.data.offset +=3D nbytes; + if (msg->stream.data.offset =3D=3D msg->stream.data.len) { + virFDStreamMsgQueuePop(fdst); + virFDStreamMsgFree(msg); + } + + ret =3D nbytes; + + } else { + retry: + ret =3D read(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno =3D=3D EAGAIN || errno =3D=3D EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret =3D -2; + } else if (errno =3D=3D EINTR) { + goto retry; + } else { + ret =3D -1; + virReportSystemError(errno, "%s", + _("cannot read from stream")); + } + goto cleanup; } - } else if (fdst->length) { - fdst->offset +=3D ret; } =20 + if (fdst->length) + fdst->offset +=3D ret; + + cleanup: virObjectUnlock(fdst); return ret; } @@ -593,7 +846,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, if (virFDStreamDataInitialize() < 0) return -1; =20 - if ((st->flags & VIR_STREAM_NONBLOCK) && + if ((st->flags & VIR_STREAM_NONBLOCK) && !threadData && virSetNonBlock(fd) < 0) { virReportSystemError(errno, "%s", _("Unable to set non-blocking mo= de")); return -1; @@ -609,11 +862,20 @@ static int virFDStreamOpenInternal(virStreamPtr st, st->privateData =3D fdst; =20 if (threadData) { + /* The thread is going to do reads if fdin is set and fdout is not= . */ + fdst->threadDoRead =3D threadData->fdout =3D=3D -1; + /* Create the thread after fdst and st were initialized. * The thread worker expects them to be that way. */ if (VIR_ALLOC(fdst->thread) < 0) goto error; =20 + if (virCondInit(&fdst->threadCond) < 0) { + virReportSystemError(errno, "%s", + _("cannot initialize condition variable")= ); + goto error; + } + if (virThreadCreate(fdst->thread, true, virFDStreamThread, @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] =3D { -1, -1 }; - if ((oflags & O_ACCMODE) =3D=3D O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags toge= ther"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; } =20 - if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - } - if (VIR_ALLOC(threadData) < 0) goto error; =20 @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st, =20 if ((oflags & O_ACCMODE) =3D=3D O_RDONLY) { threadData->fdin =3D fd; - threadData->fdout =3D fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout =3D -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd =3D fds[0]; } else { - threadData->fdin =3D fds[0]; + threadData->fdin =3D -1; threadData->fdout =3D fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd =3D fds[1]; } } =20 --=20 2.10.2 -- libvir-list mailing list libvir-list@redhat.com https://www.redhat.com/mailman/listinfo/libvir-list