[libvirt RFCv11 14/33] virfile: add new API virFileDiskCopyChannel

Claudio Fontana posted 33 patches 3 years, 8 months ago
There is a newer version of this series
[libvirt RFCv11 14/33] virfile: add new API virFileDiskCopyChannel
Posted by Claudio Fontana 3 years, 8 months ago
allow interleaved parallel write to a single file,
using a record size equal to the io buffer size (1MB).

Signed-off-by: Claudio Fontana <cfontana@suse.de>
---
 src/util/iohelper.c |   3 +
 src/util/virfile.c  | 151 +++++++++++++++++++++++++++++---------------
 src/util/virfile.h  |   2 +
 3 files changed, 106 insertions(+), 50 deletions(-)

diff --git a/src/util/iohelper.c b/src/util/iohelper.c
index 055540c8c4..dcbdda366f 100644
--- a/src/util/iohelper.c
+++ b/src/util/iohelper.c
@@ -85,6 +85,9 @@ main(int argc, char **argv)
     if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
         goto error;
 
+    if (VIR_CLOSE(fd) < 0)
+        goto error;
+
     return 0;
 
  error:
diff --git a/src/util/virfile.c b/src/util/virfile.c
index 201d7f4e64..f9ae7d94c4 100644
--- a/src/util/virfile.c
+++ b/src/util/virfile.c
@@ -4761,6 +4761,9 @@ struct runIOParams {
     const char *fdinname;
     int fdout;
     const char *fdoutname;
+    int idx;
+    int nchannels;
+    off_t total;
 };
 
 /**
@@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
     off_t total = 0;
     size_t buflen = 1024*1024;
     char *buf = virFileDirectBufferNew(&base, buflen);
+    int diskfd = p.isWrite ? p.fdout : p.fdin;
 
     if (!buf) {
         virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__);
         return -5;
     }
-
+    if (p.idx >= 0) {
+        if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
+            virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset"));
+            return -6;
+        }
+    }
     while (1) {
         ssize_t got;
 
@@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
             break;
 
         total += got;
-
+        if (p.idx >= 0 && !p.isWrite && total > p.total) {
+            /* do not write to socket too much for this channel, according to CLIA */
+            off_t difference = total - p.total;
+            got -= difference;
+            total -= difference;
+        }
         /* handle last write size align in direct case */
         if (got < buflen && p.isDirect && p.isWrite) {
             ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
@@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
                 virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
                 return -3;
             }
-            if (!p.isBlockDev) {
+            if (!p.isBlockDev && p.idx < 0) {
                 off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
                 if (off < 0) {
                     virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset"));
@@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
                 }
                 if (nwritten > got) {
                     off -= nwritten - got;
+                    total -= nwritten - got;
                 }
                 if (ftruncate(p.fdout, off) < 0) {
                     virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname);
@@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
             virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
             return -3;
         }
+        if (p.idx >= 0) {
+            if (!p.isWrite && total >= p.total) {
+                /* done for this channel */
+                break;
+            }
+            /* move channel cursor to the next record */
+            if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
+                virReportSystemError(errno, "%s", _("Failed to lseek to next channel record"));
+                return -7;
+            }
+        }
     }
     return total;
 }
 
 /**
- * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
- *
- * @disk_fd:     the already open regular file or block device
- * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
- * @remote_fd:   the pipe or socket
- *               Use -1 to auto-choose between STDIN or STDOUT.
- * @remote_path: the pathname corresponding to remote_fd (for error reporting)
- *
- * Note that the direction of the transfer is detected based on the @disk_fd
- * file access mode (man 2 open). Therefore @disk_fd must be opened with
- * O_RDONLY or O_WRONLY. O_RDWR is not supported.
- *
- * virFileDiskCopy always closes the file descriptor disk_fd,
- * and any error during close(2) is reported and considered a failure.
- *
- * Returns: bytes transferred or < 0 on failure.
+ * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write
+ * ...
+ * @idx:       channel index
+ * @nchannels: total number of channels
  */
 
 off_t
-virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path)
+virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
+                       int idx, int nchannels, off_t total)
 {
-    int ret = -1;
-    off_t total = 0;
+    off_t new_total = -1;
     struct stat sb;
     struct runIOParams p;
     int oflags = -1;
 
+    if ((nchannels == 0) ||
+        (nchannels > 0 && idx >= nchannels) ||
+        (nchannels > 0 && idx < 0) ||
+        (nchannels < 0 && idx >= 0)) {
+        virReportSystemError(EINVAL, "%s", _("Invalid channel arguments"));
+        goto out;
+    }
+    p.idx = idx;
+    p.nchannels = nchannels;
+    p.total = total;
+
     oflags = fcntl(disk_fd, F_GETFL);
 
     if (oflags < 0) {
         virReportSystemError(errno,
                              _("unable to determine access mode of %s"),
                              disk_path);
-        goto cleanup;
+        goto out;
     }
     if (fstat(disk_fd, &sb) < 0) {
         virReportSystemError(errno,
                              _("unable to stat file descriptor %d path %s"),
                              disk_fd, disk_path);
-        goto cleanup;
+        goto out;
     }
     p.isBlockDev = S_ISBLK(sb.st_mode);
     p.isDirect = O_DIRECT && (oflags & O_DIRECT);
@@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r
     default:
         virReportSystemError(EINVAL, _("Unable to process file with flags %d"),
                              (oflags & O_ACCMODE));
-        goto cleanup;
+        goto out;
     }
     if (!p.isBlockDev && p.isDirect) {
         off_t off = lseek(disk_fd, 0, SEEK_CUR);
         if (off < 0) {
             virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file"));
-            goto cleanup;
+            goto out;
         }
         if (virFileDirectAlign(off) != off) {
             /* we could write some zeroes, but maybe it is safer to just fail */
             virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer"));
-            goto cleanup;
+            goto out;
         }
     }
-    total = runIOCopy(p);
-    if (total < 0)
-        goto cleanup;
-
-    /* Ensure all data is written */
-    if (virFileDataSync(p.fdout) < 0) {
-        if (errno != EINVAL && errno != EROFS) {
-            /* fdatasync() may fail on some special FDs, e.g. pipes */
-            virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname);
-            goto cleanup;
+    new_total = runIOCopy(p);
+    if (new_total < 0)
+        goto out;
+
+    if (p.idx < 0 && p.isWrite) {
+        /* without channels we can run the fdatasync here */
+        if (virFileDataSync(disk_fd) < 0) {
+            if (errno != EINVAL && errno != EROFS) {
+                virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname);
+                new_total = -1;
+                goto out;
+            }
         }
     }
 
-    ret = 0;
-
- cleanup:
-    if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
-        virReportSystemError(errno, _("Unable to close %s"), disk_path);
-        ret = -1;
-    }
-    return ret;
+ out:
+    return new_total;
 }
 
 #else /* WIN32 */
 
 off_t
-virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
-                const char *disk_path G_GNUC_UNUSED,
-                int remote_fd G_GNUC_UNUSED,
-                const char *remote_path G_GNUC_UNUSED)
+virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
+                       const char *disk_path G_GNUC_UNUSED,
+                       int remote_fd G_GNUC_UNUSED,
+                       const char *remote_path G_GNUC_UNUSED,
+                       int idx G_GNUC_UNUSED,
+                       int nchannels G_GNUC_UNUSED,
+                       off_t total G_GNUC_UNUSED)
 {
     virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                   _("virFileDiskCopy unsupported on this platform"));
+                   _("virFileDiskCopyChannel unsupported on this platform"));
     return -1;
 }
 #endif /* WIN32 */
+
+/**
+ * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
+ *
+ * @disk_fd:     the already open regular file or block device
+ * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
+ * @remote_fd:   the pipe or socket
+ *               Use -1 to auto-choose between STDIN or STDOUT.
+ * @remote_path: the pathname corresponding to remote_fd (for error reporting)
+ *
+ * Note that the direction of the transfer is detected based on the @disk_fd
+ * file access mode (man 2 open). Therefore @disk_fd must be opened with
+ * O_RDONLY or O_WRONLY. O_RDWR is not supported.
+ *
+ * virFileDiskCopy always closes the file descriptor disk_fd,
+ * and any error during close(2) is reported and considered a failure.
+ *
+ * Returns: bytes transferred or < 0 on failure.
+ */
+
+off_t
+virFileDiskCopy(int disk_fd, const char *disk_path,
+                int remote_fd, const char *remote_path)
+{
+    return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
+                                  -1, -1, 0);
+}
diff --git a/src/util/virfile.h b/src/util/virfile.h
index 844261e0a4..4d75389c84 100644
--- a/src/util/virfile.h
+++ b/src/util/virfile.h
@@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
                   virTristateBool state);
 
 off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path);
+off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
+                             int idx, int nchannels, off_t total);
-- 
2.26.2
Re: [libvirt RFCv11 14/33] virfile: add new API virFileDiskCopyChannel
Posted by Claudio Fontana 3 years, 8 months ago
On 6/7/22 11:19, Claudio Fontana wrote:
> allow interleaved parallel write to a single file,
> using a record size equal to the io buffer size (1MB).
> 
> Signed-off-by: Claudio Fontana <cfontana@suse.de>
> ---
>  src/util/iohelper.c |   3 +
>  src/util/virfile.c  | 151 +++++++++++++++++++++++++++++---------------
>  src/util/virfile.h  |   2 +
>  3 files changed, 106 insertions(+), 50 deletions(-)
> 
> diff --git a/src/util/iohelper.c b/src/util/iohelper.c
> index 055540c8c4..dcbdda366f 100644
> --- a/src/util/iohelper.c
> +++ b/src/util/iohelper.c
> @@ -85,6 +85,9 @@ main(int argc, char **argv)
>      if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
>          goto error;
>  
> +    if (VIR_CLOSE(fd) < 0)
> +        goto error;
> +
>      return 0;
>  
>   error:
> diff --git a/src/util/virfile.c b/src/util/virfile.c
> index 201d7f4e64..f9ae7d94c4 100644
> --- a/src/util/virfile.c
> +++ b/src/util/virfile.c
> @@ -4761,6 +4761,9 @@ struct runIOParams {
>      const char *fdinname;
>      int fdout;
>      const char *fdoutname;
> +    int idx;
> +    int nchannels;
> +    off_t total;
>  };
>  
>  /**
> @@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
>      off_t total = 0;
>      size_t buflen = 1024*1024;
>      char *buf = virFileDirectBufferNew(&base, buflen);
> +    int diskfd = p.isWrite ? p.fdout : p.fdin;
>  
>      if (!buf) {
>          virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__);
>          return -5;
>      }
> -
> +    if (p.idx >= 0) {
> +        if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
> +            virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset"));
> +            return -6;
> +        }
> +    }
>      while (1) {
>          ssize_t got;
>  
> @@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
>              break;
>  
>          total += got;
> -
> +        if (p.idx >= 0 && !p.isWrite && total > p.total) {
> +            /* do not write to socket too much for this channel, according to CLIA */
> +            off_t difference = total - p.total;
> +            got -= difference;
> +            total -= difference;
> +        }
>          /* handle last write size align in direct case */
>          if (got < buflen && p.isDirect && p.isWrite) {
>              ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
> @@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
>                  virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
>                  return -3;
>              }
> -            if (!p.isBlockDev) {
> +            if (!p.isBlockDev && p.idx < 0) {
>                  off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
>                  if (off < 0) {
>                      virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset"));
> @@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
>                  }
>                  if (nwritten > got) {
>                      off -= nwritten - got;
> +                    total -= nwritten - got;
>                  }
>                  if (ftruncate(p.fdout, off) < 0) {
>                      virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname);
> @@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
>              virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
>              return -3;
>          }
> +        if (p.idx >= 0) {
> +            if (!p.isWrite && total >= p.total) {
> +                /* done for this channel */
> +                break;
> +            }
> +            /* move channel cursor to the next record */
> +            if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
> +                virReportSystemError(errno, "%s", _("Failed to lseek to next channel record"));
> +                return -7;
> +            }
> +        }
>      }
>      return total;
>  }
>  
>  /**
> - * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> - *
> - * @disk_fd:     the already open regular file or block device
> - * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
> - * @remote_fd:   the pipe or socket
> - *               Use -1 to auto-choose between STDIN or STDOUT.
> - * @remote_path: the pathname corresponding to remote_fd (for error reporting)
> - *
> - * Note that the direction of the transfer is detected based on the @disk_fd
> - * file access mode (man 2 open). Therefore @disk_fd must be opened with
> - * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> - *
> - * virFileDiskCopy always closes the file descriptor disk_fd,
> - * and any error during close(2) is reported and considered a failure.
> - *
> - * Returns: bytes transferred or < 0 on failure.
> + * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write
> + * ...
> + * @idx:       channel index
> + * @nchannels: total number of channels
>   */
>  
>  off_t
> -virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path)
> +virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
> +                       int idx, int nchannels, off_t total)
>  {
> -    int ret = -1;
> -    off_t total = 0;
> +    off_t new_total = -1;
>      struct stat sb;
>      struct runIOParams p;
>      int oflags = -1;
>  
> +    if ((nchannels == 0) ||
> +        (nchannels > 0 && idx >= nchannels) ||
> +        (nchannels > 0 && idx < 0) ||
> +        (nchannels < 0 && idx >= 0)) {
> +        virReportSystemError(EINVAL, "%s", _("Invalid channel arguments"));
> +        goto out;
> +    }
> +    p.idx = idx;
> +    p.nchannels = nchannels;
> +    p.total = total;
> +
>      oflags = fcntl(disk_fd, F_GETFL);
>  
>      if (oflags < 0) {
>          virReportSystemError(errno,
>                               _("unable to determine access mode of %s"),
>                               disk_path);
> -        goto cleanup;
> +        goto out;
>      }
>      if (fstat(disk_fd, &sb) < 0) {
>          virReportSystemError(errno,
>                               _("unable to stat file descriptor %d path %s"),
>                               disk_fd, disk_path);
> -        goto cleanup;
> +        goto out;
>      }
>      p.isBlockDev = S_ISBLK(sb.st_mode);
>      p.isDirect = O_DIRECT && (oflags & O_DIRECT);
> @@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r
>      default:
>          virReportSystemError(EINVAL, _("Unable to process file with flags %d"),
>                               (oflags & O_ACCMODE));
> -        goto cleanup;
> +        goto out;
>      }
>      if (!p.isBlockDev && p.isDirect) {
>          off_t off = lseek(disk_fd, 0, SEEK_CUR);
>          if (off < 0) {
>              virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file"));
> -            goto cleanup;
> +            goto out;
>          }
>          if (virFileDirectAlign(off) != off) {
>              /* we could write some zeroes, but maybe it is safer to just fail */
>              virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer"));
> -            goto cleanup;
> +            goto out;
>          }
>      }
> -    total = runIOCopy(p);
> -    if (total < 0)
> -        goto cleanup;
> -
> -    /* Ensure all data is written */
> -    if (virFileDataSync(p.fdout) < 0) {
> -        if (errno != EINVAL && errno != EROFS) {
> -            /* fdatasync() may fail on some special FDs, e.g. pipes */
> -            virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname);
> -            goto cleanup;
> +    new_total = runIOCopy(p);
> +    if (new_total < 0)
> +        goto out;
> +
> +    if (p.idx < 0 && p.isWrite) {
> +        /* without channels we can run the fdatasync here */
> +        if (virFileDataSync(disk_fd) < 0) {
> +            if (errno != EINVAL && errno != EROFS) {
> +                virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname);
> +                new_total = -1;
> +                goto out;
> +            }
>          }
>      }
>  
> -    ret = 0;
> -
> - cleanup:
> -    if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
> -        virReportSystemError(errno, _("Unable to close %s"), disk_path);
> -        ret = -1;
> -    }
> -    return ret;
> + out:
> +    return new_total;
>  }
>  
>  #else /* WIN32 */
>  
>  off_t
> -virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
> -                const char *disk_path G_GNUC_UNUSED,
> -                int remote_fd G_GNUC_UNUSED,
> -                const char *remote_path G_GNUC_UNUSED)
> +virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
> +                       const char *disk_path G_GNUC_UNUSED,
> +                       int remote_fd G_GNUC_UNUSED,
> +                       const char *remote_path G_GNUC_UNUSED,
> +                       int idx G_GNUC_UNUSED,
> +                       int nchannels G_GNUC_UNUSED,
> +                       off_t total G_GNUC_UNUSED)
>  {
>      virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> -                   _("virFileDiskCopy unsupported on this platform"));
> +                   _("virFileDiskCopyChannel unsupported on this platform"));
>      return -1;
>  }
>  #endif /* WIN32 */
> +
> +/**
> + * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> + *
> + * @disk_fd:     the already open regular file or block device
> + * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
> + * @remote_fd:   the pipe or socket
> + *               Use -1 to auto-choose between STDIN or STDOUT.
> + * @remote_path: the pathname corresponding to remote_fd (for error reporting)
> + *
> + * Note that the direction of the transfer is detected based on the @disk_fd
> + * file access mode (man 2 open). Therefore @disk_fd must be opened with
> + * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> + *
> + * virFileDiskCopy always closes the file descriptor disk_fd,
> + * and any error during close(2) is reported and considered a failure.

this is not true anymore, the close needs to be done outside of virFileDiskCopy now.

> + *
> + * Returns: bytes transferred or < 0 on failure.
> + */
> +
> +off_t
> +virFileDiskCopy(int disk_fd, const char *disk_path,
> +                int remote_fd, const char *remote_path)
> +{
> +    return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
> +                                  -1, -1, 0);
> +}
> diff --git a/src/util/virfile.h b/src/util/virfile.h
> index 844261e0a4..4d75389c84 100644
> --- a/src/util/virfile.h
> +++ b/src/util/virfile.h
> @@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
>                    virTristateBool state);
>  
>  off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path);
> +off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
> +                             int idx, int nchannels, off_t total);