From nobody Sun Feb 8 19:43:57 2026 Delivered-To: importer@patchew.org Received-SPF: pass (zoho.com: domain of redhat.com designates 209.132.183.28 as permitted sender) client-ip=209.132.183.28; envelope-from=libvir-list-bounces@redhat.com; helo=mx1.redhat.com; Authentication-Results: mx.zoho.com; spf=pass (zoho.com: domain of redhat.com designates 209.132.183.28 as permitted sender) smtp.mailfrom=libvir-list-bounces@redhat.com; Return-Path: Received: from mx1.redhat.com (mx1.redhat.com [209.132.183.28]) by mx.zohomail.com with SMTPS id 149268268553028.69641377718858; Thu, 20 Apr 2017 03:04:45 -0700 (PDT) Received: from smtp.corp.redhat.com (int-mx02.intmail.prod.int.phx2.redhat.com [10.5.11.12]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id 9FB2A635D9; Thu, 20 Apr 2017 10:04:43 +0000 (UTC) Received: from colo-mx.corp.redhat.com (colo-mx01.intmail.prod.int.phx2.redhat.com [10.5.11.20]) by smtp.corp.redhat.com (Postfix) with ESMTPS id 7126B82794; Thu, 20 Apr 2017 10:04:43 +0000 (UTC) Received: from lists01.pubmisc.prod.ext.phx2.redhat.com (lists01.pubmisc.prod.ext.phx2.redhat.com [10.5.19.33]) by colo-mx.corp.redhat.com (Postfix) with ESMTP id 179D418523D2; Thu, 20 Apr 2017 10:04:25 +0000 (UTC) Received: from smtp.corp.redhat.com (int-mx04.intmail.prod.int.phx2.redhat.com [10.5.11.14]) by lists01.pubmisc.prod.ext.phx2.redhat.com (8.13.8/8.13.8) with ESMTP id v3KA2Hl1001305 for ; Thu, 20 Apr 2017 06:02:17 -0400 Received: by smtp.corp.redhat.com (Postfix) id 1B5207F6BA; Thu, 20 Apr 2017 10:02:17 +0000 (UTC) Received: from moe.brq.redhat.com (dhcp129-131.brq.redhat.com [10.34.129.131]) by smtp.corp.redhat.com (Postfix) with ESMTP id 93C247F6AC for ; Thu, 20 Apr 2017 10:02:16 +0000 (UTC) DMARC-Filter: OpenDMARC Filter v1.3.2 mx1.redhat.com 9FB2A635D9 Authentication-Results: ext-mx10.extmail.prod.ext.phx2.redhat.com; dmarc=none (p=none dis=none) header.from=redhat.com Authentication-Results: ext-mx10.extmail.prod.ext.phx2.redhat.com; spf=pass smtp.mailfrom=libvir-list-bounces@redhat.com DKIM-Filter: OpenDKIM Filter v2.11.0 mx1.redhat.com 9FB2A635D9 From: Michal Privoznik To: libvir-list@redhat.com Date: Thu, 20 Apr 2017 12:01:34 +0200 Message-Id: <116f5340c99bf004ba27b299840abd3cc74b6638.1492682033.git.mprivozn@redhat.com> In-Reply-To: References: In-Reply-To: References: X-Scanned-By: MIMEDefang 2.79 on 10.5.11.14 X-loop: libvir-list@redhat.com Subject: [libvirt] [PATCH v2 05/38] virfdstream: Drop iohelper in favour of a thread X-BeenThere: libvir-list@redhat.com X-Mailman-Version: 2.1.12 Precedence: junk List-Id: Development discussions about the libvirt library & tools List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Sender: libvir-list-bounces@redhat.com Errors-To: libvir-list-bounces@redhat.com X-Scanned-By: MIMEDefang 2.79 on 10.5.11.12 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.39]); Thu, 20 Apr 2017 10:04:44 +0000 (UTC) X-ZohoMail: RSF_0 Z_629925259 SPT_0 Content-Type: text/plain; charset="utf-8" Currently we use iohelper for virFDStream implementation. This is because UNIX I/O can lie sometimes: even though a FD for a file/block device is set as unblocking, actual read()/write() can block. To avoid this, a pipe is created and one end is kept for read/write while the other is handed over to iohelper to write/read the data for us. Thus it's iohelper which gets blocked and not our event loop. This approach has two problems: 1) we are spawning a new process. 2) any exchange of information between daemon and iohelper can be done only through the pipe. Therefore, iohelper is replaced with an implementation in thread which is created just for the stream lifetime. The data are still transferred through pipe (for now), but both problems described above are solved. Signed-off-by: Michal Privoznik --- src/util/virfdstream.c | 245 +++++++++++++++++++++++++++++++--------------= ---- src/util/virfdstream.h | 1 - 2 files changed, 158 insertions(+), 88 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 9a4a7ff..7a8d65d 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -56,8 +56,6 @@ struct virFDStreamData { virObjectLockable parent; =20 int fd; - int errfd; - virCommandPtr cmd; unsigned long long offset; unsigned long long length; =20 @@ -79,6 +77,11 @@ struct virFDStreamData { virFDStreamInternalCloseCb icbCb; virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; void *icbOpaque; + + /* Thread data */ + virThreadPtr thread; + int threadErr; + bool threadQuit; }; =20 static virClassPtr virFDStreamDataClass; @@ -264,57 +267,123 @@ virFDStreamAddCallback(virStreamPtr st, return ret; } =20 + +typedef struct _virFDStreamThreadData virFDStreamThreadData; +typedef virFDStreamThreadData *virFDStreamThreadDataPtr; +struct _virFDStreamThreadData { + virStreamPtr st; + size_t length; + int fdin; + char *fdinname; + int fdout; + char *fdoutname; +}; + + +static void +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) +{ + if (!data) + return; + + virObjectUnref(data->st); + VIR_FREE(data->fdinname); + VIR_FREE(data->fdoutname); + VIR_FREE(data); +} + + +static void +virFDStreamThread(void *opaque) +{ + virFDStreamThreadDataPtr data =3D opaque; + virStreamPtr st =3D data->st; + size_t length =3D data->length; + int fdin =3D data->fdin; + char *fdinname =3D data->fdinname; + int fdout =3D data->fdout; + char *fdoutname =3D data->fdoutname; + virFDStreamDataPtr fdst =3D st->privateData; + char *buf =3D NULL; + size_t buflen =3D 256 * 1024; + size_t total =3D 0; + + virObjectRef(fdst); + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + while (1) { + ssize_t got; + + if (length && + (length - total) < buflen) + buflen =3D length - total; + + if (buflen =3D=3D 0) + break; /* End of requested data from client */ + + if ((got =3D saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + if (got =3D=3D 0) + break; + + total +=3D got; + + if (safewrite(fdout, buf, got) < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + goto error; + } + } + + cleanup: + if (!virObjectUnref(fdst)) + st->privateData =3D NULL; + VIR_FORCE_CLOSE(fdin); + VIR_FORCE_CLOSE(fdout); + virFDStreamThreadDataFree(data); + VIR_FREE(buf); + return; + + error: + virObjectLock(fdst); + fdst->threadErr =3D errno; + virObjectUnlock(fdst); + goto cleanup; +} + + static int -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) +virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) { - char buf[1024]; - ssize_t len; - int status; int ret =3D -1; - - if (!fdst->cmd) + if (!fdst->thread) return 0; =20 - if ((len =3D saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0) - buf[0] =3D '\0'; - else - buf[len] =3D '\0'; + /* Give the thread a chance to lock the FD stream object. */ + virObjectUnlock(fdst); + virThreadJoin(fdst->thread); + virObjectLock(fdst); =20 - virCommandRawStatus(fdst->cmd); - if (virCommandWait(fdst->cmd, &status) < 0) - goto cleanup; - - if (status !=3D 0) { - if (buf[0] !=3D '\0') { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf); - } else if (WIFSIGNALED(status) && WTERMSIG(status) =3D=3D SIGPIPE)= { - if (streamAbort) { - /* Explicit abort request means the caller doesn't care - if there's data left over, so skip the error */ - goto out; - } - - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("I/O helper exited " - "before all data was processed")); - } else { - char *str =3D virProcessTranslateStatus(status); - virReportError(VIR_ERR_INTERNAL_ERROR, - _("I/O helper exited with %s"), - NULLSTR(str)); - VIR_FREE(str); - } + if (fdst->threadErr && !streamAbort) { + /* errors are expected on streamAbort */ goto cleanup; } =20 - out: ret =3D 0; cleanup: - virCommandFree(fdst->cmd); - fdst->cmd =3D NULL; + VIR_FREE(fdst->thread); return ret; } =20 + static int virFDStreamCloseInt(virStreamPtr st, bool streamAbort) { @@ -359,12 +428,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) =20 /* mutex locked */ ret =3D VIR_CLOSE(fdst->fd); - if (virFDStreamCloseCommand(fdst, streamAbort) < 0) + if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret =3D -1; =20 - if (VIR_CLOSE(fdst->errfd) < 0) - VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd); - st->privateData =3D NULL; =20 /* call the internal stream closing callback */ @@ -516,14 +582,13 @@ static virStreamDriver virFDStreamDrv =3D { =20 static int virFDStreamOpenInternal(virStreamPtr st, int fd, - virCommandPtr cmd, - int errfd, + virFDStreamThreadDataPtr threadData, unsigned long long length) { virFDStreamDataPtr fdst; =20 - VIR_DEBUG("st=3D%p fd=3D%d cmd=3D%p errfd=3D%d length=3D%llu", - st, fd, cmd, errfd, length); + VIR_DEBUG("st=3D%p fd=3D%d threadData=3D%p length=3D%llu", + st, fd, threadData, length); =20 if (virFDStreamDataInitialize() < 0) return -1; @@ -538,21 +603,39 @@ static int virFDStreamOpenInternal(virStreamPtr st, return -1; =20 fdst->fd =3D fd; - fdst->cmd =3D cmd; - fdst->errfd =3D errfd; fdst->length =3D length; =20 st->driver =3D &virFDStreamDrv; st->privateData =3D fdst; =20 + if (threadData) { + /* Create the thread after fdst and st were initialized. + * The thread worker expects them to be that way. */ + if (VIR_ALLOC(fdst->thread) < 0) + goto error; + + if (virThreadCreate(fdst->thread, + true, + virFDStreamThread, + threadData) < 0) + goto error; + } + return 0; + + error: + VIR_FREE(fdst->thread); + st->driver =3D NULL; + st->privateData =3D NULL; + virObjectUnref(fdst); + return -1; } =20 =20 int virFDStreamOpen(virStreamPtr st, int fd) { - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); + return virFDStreamOpenInternal(st, fd, NULL, 0); } =20 =20 @@ -598,7 +681,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, goto error; } =20 - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) + if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0) goto error; return 0; =20 @@ -627,11 +710,10 @@ virFDStreamOpenFileInternal(virStreamPtr st, bool forceIOHelper) { int fd =3D -1; - int childfd =3D -1; + int pipefds[2] =3D { -1, -1 }; + int tmpfd =3D -1; struct stat sb; - virCommandPtr cmd =3D NULL; - int errfd =3D -1; - char *iohelper_path =3D NULL; + virFDStreamThreadDataPtr threadData =3D NULL; =20 VIR_DEBUG("st=3D%p path=3D%s oflags=3D%x offset=3D%llu length=3D%llu m= ode=3D%o", st, path, oflags, offset, length, mode); @@ -648,6 +730,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, path); return -1; } + tmpfd =3D fd; =20 if (fstat(fd, &sb) < 0) { virReportSystemError(errno, @@ -672,7 +755,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] =3D { -1, -1 }; =20 if ((oflags & O_ACCMODE) =3D=3D O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, @@ -681,58 +763,47 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; } =20 - if (pipe(fds) < 0) { + if (pipe(pipefds) < 0) { virReportSystemError(errno, "%s", _("Unable to create pipe")); goto error; } =20 - if (!(iohelper_path =3D virFileFindResource("libvirt_iohelper", - abs_topbuilddir "/src", - LIBEXECDIR))) + if (VIR_ALLOC(threadData) < 0) goto error; =20 - cmd =3D virCommandNewArgList(iohelper_path, - path, - NULL); - - VIR_FREE(iohelper_path); - - virCommandAddArgFormat(cmd, "%llu", length); - virCommandPassFD(cmd, fd, - VIR_COMMAND_PASS_FD_CLOSE_PARENT); - virCommandAddArgFormat(cmd, "%d", fd); + threadData->st =3D virObjectRef(st); + threadData->length =3D length; =20 if ((oflags & O_ACCMODE) =3D=3D O_RDONLY) { - childfd =3D fds[1]; - fd =3D fds[0]; - virCommandSetOutputFD(cmd, &childfd); + threadData->fdin =3D fd; + threadData->fdout =3D pipefds[1]; + if (VIR_STRDUP(threadData->fdinname, path) < 0 || + VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + goto error; + tmpfd =3D pipefds[0]; } else { - childfd =3D fds[0]; - fd =3D fds[1]; - virCommandSetInputFD(cmd, childfd); + threadData->fdin =3D pipefds[0]; + threadData->fdout =3D fd; + if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || + VIR_STRDUP(threadData->fdoutname, path) < 0) + goto error; + tmpfd =3D pipefds[1]; } - virCommandSetErrorFD(cmd, &errfd); - - if (virCommandRunAsync(cmd, NULL) < 0) - goto error; - - VIR_FORCE_CLOSE(childfd); } =20 - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) + if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0) goto error; =20 return 0; =20 error: - virCommandFree(cmd); VIR_FORCE_CLOSE(fd); - VIR_FORCE_CLOSE(childfd); - VIR_FORCE_CLOSE(errfd); - VIR_FREE(iohelper_path); + VIR_FORCE_CLOSE(pipefds[0]); + VIR_FORCE_CLOSE(pipefds[1]); if (oflags & O_CREAT) unlink(path); + virFDStreamThreadDataFree(threadData); return -1; } =20 diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 32a741e..34c4c3f 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -24,7 +24,6 @@ # define __VIR_FDSTREAM_H_ =20 # include "internal.h" -# include "vircommand.h" =20 /* internal callback, the generic one is used up by daemon stream driver */ /* the close callback is called with fdstream private data locked */ --=20 2.10.2 -- libvir-list mailing list libvir-list@redhat.com https://www.redhat.com/mailman/listinfo/libvir-list