[libvirt] [PATCH v2 15/38] Introduce virStreamSparseSendAll

Michal Privoznik posted 38 patches 8 years, 9 months ago
There is a newer version of this series
[libvirt] [PATCH v2 15/38] Introduce virStreamSparseSendAll
Posted by Michal Privoznik 8 years, 9 months ago
This is just a wrapper over new function that have been just
introduced: virStreamSkip() . It's very similar to
virStreamSendAll() except it handles sparse streams well.

Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
---
 include/libvirt/libvirt-stream.h |  59 +++++++++++++++++++--
 src/libvirt-stream.c             | 107 +++++++++++++++++++++++++++++++++++++++
 src/libvirt_public.syms          |   1 +
 3 files changed, 164 insertions(+), 3 deletions(-)

diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
index e5f5126..3e9ff11 100644
--- a/include/libvirt/libvirt-stream.h
+++ b/include/libvirt/libvirt-stream.h
@@ -69,9 +69,9 @@ int virStreamHoleSize(virStreamPtr,
  * @nbytes: size of the data array
  * @opaque: optional application provided data
  *
- * The virStreamSourceFunc callback is used together
- * with the virStreamSendAll function for libvirt to
- * obtain the data that is to be sent.
+ * The virStreamSourceFunc callback is used together with
+ * the virStreamSendAll and virStreamSparseSendAll functions
+ * for libvirt to obtain the data that is to be sent.
  *
  * The callback will be invoked multiple times,
  * fetching data in small chunks. The application
@@ -95,6 +95,59 @@ int virStreamSendAll(virStreamPtr st,
                      void *opaque);
 
 /**
+ * virStreamSourceHoleFunc:
+ * @st: the stream object
+ * @inData: are we in data section
+ * @length: how long is the section we are currently in
+ * @opaque: optional application provided data
+ *
+ * The virStreamSourceHoleFunc callback is used together
+ * with the virStreamSparseSendAll function for libvirt to
+ * obtain the length of section stream is currently in.
+ *
+ * Moreover, upon successful return, @length should be
+ * updated with how much bytes are there left until current
+ * section ends (be it data section or hole section) and if
+ * the stream is currently in data section, @inData should
+ * be set to a non-zero value and vice versa.
+ * As a corner case, there's an implicit hole at the end of
+ * each file. If that's the case, @inData should be set to 0
+ * as well as @length.
+ * Moreover, this function should upon its return leave the
+ * file in the position it was called with.
+ *
+ * Returns 0 on success,
+ *        -1 upon error
+ */
+typedef int (*virStreamSourceHoleFunc)(virStreamPtr st,
+                                       int *inData,
+                                       unsigned long long *length,
+                                       void *opaque);
+
+/**
+ * virStreamSourceSkipFunc:
+ * @st: the stream object
+ * @length: stream hole size
+ * @opaque: optional application provided data
+ *
+ * This callback is used together with the virStreamSparseSendAll
+ * to skip holes in the underlying file as reported by
+ * virStreamSourceHoleFunc.
+ *
+ * Returns 0 on success,
+ *        -1 upon error.
+ */
+typedef int (*virStreamSourceSkipFunc)(virStreamPtr st,
+                                       unsigned long long length,
+                                       void *opaque);
+
+int virStreamSparseSendAll(virStreamPtr st,
+                           virStreamSourceFunc handler,
+                           virStreamSourceHoleFunc holeHandler,
+                           virStreamSourceSkipFunc skipHandler,
+                           void *opaque);
+
+/**
  * virStreamSinkFunc:
  *
  * @st: the stream object
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index 81190cc..7ad5a38 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -565,7 +565,114 @@ virStreamSendAll(virStreamPtr stream,
 }
 
 
+
 /**
+ * virStreamSparseSendAll:
+ * @stream: pointer to the stream object
+ * @handler: source callback for reading data from application
+ * @holeHandler: source callback for determining holes
+ * @skipHandler: skip holes as reported by @holeHandler
+ * @opaque: application defined data
+ *
+ * Some dummy description here.
+ *
+ * Opaque data in @opaque are shared between @handler, @holeHandler and @skipHandler.
+ *
+ * Returns 0 if all the data was successfully sent. The caller
+ * should invoke virStreamFinish(st) to flush the stream upon
+ * success and then virStreamFree.
+ *
+ * Returns -1 upon any error, with virStreamAbort() already
+ * having been called,  so the caller need only call
+ * virStreamFree().
+ */
+int virStreamSparseSendAll(virStreamPtr stream,
+                           virStreamSourceFunc handler,
+                           virStreamSourceHoleFunc holeHandler,
+                           virStreamSourceSkipFunc skipHandler,
+                           void *opaque)
+{
+    char *bytes = NULL;
+    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
+    int ret = -1;
+    unsigned long long dataLen = 0;
+
+    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
+              stream, handler, holeHandler, opaque);
+
+    virResetLastError();
+
+    virCheckStreamReturn(stream, -1);
+    virCheckNonNullArgGoto(handler, cleanup);
+    virCheckNonNullArgGoto(holeHandler, cleanup);
+    virCheckNonNullArgGoto(skipHandler, cleanup);
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+                       _("data sources cannot be used for non-blocking streams"));
+        goto cleanup;
+    }
+
+    if (VIR_ALLOC_N(bytes, want) < 0)
+        goto cleanup;
+
+    for (;;) {
+        int inData, got, offset = 0;
+        unsigned long long sectionLen;
+
+        if (!dataLen) {
+            if (holeHandler(stream, &inData, &sectionLen, opaque) < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+
+            if (!inData && sectionLen) {
+                if (virStreamSkip(stream, sectionLen) < 0) {
+                    virStreamAbort(stream);
+                    goto cleanup;
+                }
+
+                if (skipHandler(stream, sectionLen, opaque) < 0) {
+                    virReportSystemError(errno, "%s",
+                                         _("unable to skip hole"));
+                    virStreamAbort(stream);
+                    goto cleanup;
+                }
+                continue;
+            } else {
+                dataLen = sectionLen;
+            }
+        }
+
+        if (want > dataLen)
+            want = dataLen;
+
+        got = (handler)(stream, bytes, want, opaque);
+        if (got < 0) {
+            virStreamAbort(stream);
+            goto cleanup;
+        }
+        if (got == 0)
+            break;
+        while (offset < got) {
+            int done;
+            done = virStreamSend(stream, bytes + offset, got - offset);
+            if (done < 0)
+                goto cleanup;
+            offset += done;
+            dataLen -= done;
+        }
+    }
+    ret = 0;
+
+ cleanup:
+    VIR_FREE(bytes);
+
+    if (ret != 0)
+        virDispatchError(stream->conn);
+
+    return ret;
+}/**
  * virStreamRecvAll:
  * @stream: pointer to the stream object
  * @handler: sink callback for writing data to application
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index 008dc59..ea4ddd5 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -765,6 +765,7 @@ LIBVIRT_3.3.0 {
         virStreamRecvFlags;
         virStreamSkip;
         virStreamSparseRecvAll;
+        virStreamSparseSendAll;
 } LIBVIRT_3.1.0;
 
 # .... define new API here using predicted next version number ....
-- 
2.10.2

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH v2 15/38] Introduce virStreamSparseSendAll
Posted by John Ferlan 8 years, 9 months ago

On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> This is just a wrapper over new function that have been just
> introduced: virStreamSkip() . It's very similar to
> virStreamSendAll() except it handles sparse streams well.

You could have some changes here due to previous API name changes.
Still the commit msg is a bit terse needs some cleanup anyway.

> 
> Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
> ---
>  include/libvirt/libvirt-stream.h |  59 +++++++++++++++++++--
>  src/libvirt-stream.c             | 107 +++++++++++++++++++++++++++++++++++++++
>  src/libvirt_public.syms          |   1 +
>  3 files changed, 164 insertions(+), 3 deletions(-)
> 
> diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
> index e5f5126..3e9ff11 100644
> --- a/include/libvirt/libvirt-stream.h
> +++ b/include/libvirt/libvirt-stream.h
> @@ -69,9 +69,9 @@ int virStreamHoleSize(virStreamPtr,
>   * @nbytes: size of the data array
>   * @opaque: optional application provided data
>   *
> - * The virStreamSourceFunc callback is used together
> - * with the virStreamSendAll function for libvirt to
> - * obtain the data that is to be sent.
> + * The virStreamSourceFunc callback is used together with
> + * the virStreamSendAll and virStreamSparseSendAll functions
> + * for libvirt to obtain the data that is to be sent.
>   *
>   * The callback will be invoked multiple times,
>   * fetching data in small chunks. The application
> @@ -95,6 +95,59 @@ int virStreamSendAll(virStreamPtr st,
>                       void *opaque);
>  
>  /**
> + * virStreamSourceHoleFunc:
> + * @st: the stream object
> + * @inData: are we in data section
> + * @length: how long is the section we are currently in
> + * @opaque: optional application provided data
> + *
> + * The virStreamSourceHoleFunc callback is used together
> + * with the virStreamSparseSendAll function for libvirt to
> + * obtain the length of section stream is currently in.
> + *
> + * Moreover, upon successful return, @length should be
> + * updated with how much bytes are there left until current

s/much/many

s/there//

s/until/until the/

> + * section ends (be it data section or hole section) and if

s/be it/either/

s/) and if/). Also if/

> + * the stream is currently in data section, @inData should
> + * be set to a non-zero value and vice versa.

Need extra line between paragraphs

> + * As a corner case, there's an implicit hole at the end of

s/As a corner case,/NB:/

> + * each file. If that's the case, @inData should be set to 0
> + * as well as @length.

... @inData and @length should both be set to 0. (reads better to me)

Need extra line between paragraphs

> + * Moreover, this function should upon its return leave the
> + * file in the position it was called with.

Consider instead:

This function is not expected to adjust the current position within the
file.

(or more explicitly - should not adjust!)

> + *
> + * Returns 0 on success,
> + *        -1 upon error
> + */
> +typedef int (*virStreamSourceHoleFunc)(virStreamPtr st,
> +                                       int *inData,
> +                                       unsigned long long *length,
> +                                       void *opaque);
> +
> +/**
> + * virStreamSourceSkipFunc:
> + * @st: the stream object
> + * @length: stream hole size
> + * @opaque: optional application provided data
> + *
> + * This callback is used together with the virStreamSparseSendAll
> + * to skip holes in the underlying file as reported by
> + * virStreamSourceHoleFunc.

The callback may be invoked multiple times as holes are found during
processing a stream. The application should skip processing the hole in
the stream source and then return. A return value of -1 at any time will
abort the send operation.

> + *
> + * Returns 0 on success,
> + *        -1 upon error.
> + */
> +typedef int (*virStreamSourceSkipFunc)(virStreamPtr st,
> +                                       unsigned long long length,
> +                                       void *opaque);
> +
> +int virStreamSparseSendAll(virStreamPtr st,
> +                           virStreamSourceFunc handler,
> +                           virStreamSourceHoleFunc holeHandler,
> +                           virStreamSourceSkipFunc skipHandler,
> +                           void *opaque);
> +
> +/**
>   * virStreamSinkFunc:
>   *
>   * @st: the stream object
> diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
> index 81190cc..7ad5a38 100644
> --- a/src/libvirt-stream.c
> +++ b/src/libvirt-stream.c
> @@ -565,7 +565,114 @@ virStreamSendAll(virStreamPtr stream,
>  }
>  
>  
> +

Spurious extra line... Now there's 3 between functions.

>  /**
> + * virStreamSparseSendAll:
> + * @stream: pointer to the stream object
> + * @handler: source callback for reading data from application
> + * @holeHandler: source callback for determining holes
> + * @skipHandler: skip holes as reported by @holeHandler
> + * @opaque: application defined data
> + *
> + * Some dummy description here.

Some smart-aleck comment here ;-)

> + *
> + * Opaque data in @opaque are shared between @handler, @holeHandler and @skipHandler.
> + *
> + * Returns 0 if all the data was successfully sent. The caller
> + * should invoke virStreamFinish(st) to flush the stream upon
> + * success and then virStreamFree.
> + *
> + * Returns -1 upon any error, with virStreamAbort() already
> + * having been called,  so the caller need only call
> + * virStreamFree().
> + */
> +int virStreamSparseSendAll(virStreamPtr stream,
> +                           virStreamSourceFunc handler,
> +                           virStreamSourceHoleFunc holeHandler,
> +                           virStreamSourceSkipFunc skipHandler,
> +                           void *opaque)
> +{
> +    char *bytes = NULL;
> +    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
> +    int ret = -1;
> +    unsigned long long dataLen = 0;
> +
> +    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
> +              stream, handler, holeHandler, opaque);
> +
> +    virResetLastError();
> +
> +    virCheckStreamReturn(stream, -1);
> +    virCheckNonNullArgGoto(handler, cleanup);
> +    virCheckNonNullArgGoto(holeHandler, cleanup);
> +    virCheckNonNullArgGoto(skipHandler, cleanup);
> +
> +    if (stream->flags & VIR_STREAM_NONBLOCK) {
> +        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
> +                       _("data sources cannot be used for non-blocking streams"));
> +        goto cleanup;
> +    }
> +
> +    if (VIR_ALLOC_N(bytes, want) < 0)
> +        goto cleanup;
> +
> +    for (;;) {
> +        int inData, got, offset = 0;
> +        unsigned long long sectionLen;
> +
> +        if (!dataLen) {
> +            if (holeHandler(stream, &inData, &sectionLen, opaque) < 0) {
> +                virStreamAbort(stream);
> +                goto cleanup;
> +            }
> +
> +            if (!inData && sectionLen) {

So if !inData and sectionLen == 0, we don't go here, but

> +                if (virStreamSkip(stream, sectionLen) < 0) {
> +                    virStreamAbort(stream);
> +                    goto cleanup;
> +                }
> +
> +                if (skipHandler(stream, sectionLen, opaque) < 0) {
> +                    virReportSystemError(errno, "%s",
> +                                         _("unable to skip hole"));
> +                    virStreamAbort(stream);
> +                    goto cleanup;
> +                }
> +                continue;
> +            } else {
> +                dataLen = sectionLen;

We go here indicating 'dataLen = 0' which is a misnomer since we're not
inData, rather we at EOF, right?  Or did I miss something a bit subtle.

> +            }
> +        }
> +
> +        if (want > dataLen)
> +            want = dataLen;
> +
> +        got = (handler)(stream, bytes, want, opaque);
> +        if (got < 0) {
> +            virStreamAbort(stream);
> +            goto cleanup;
> +        }
> +        if (got == 0)
> +            break;
> +        while (offset < got) {
> +            int done;
> +            done = virStreamSend(stream, bytes + offset, got - offset);
> +            if (done < 0)
> +                goto cleanup;
> +            offset += done;
> +            dataLen -= done;
> +        }
> +    }
> +    ret = 0;
> +
> + cleanup:
> +    VIR_FREE(bytes);
> +
> +    if (ret != 0)
> +        virDispatchError(stream->conn);
> +
> +    return ret;
> +}/**

Need to clean up the spacing here... e.g.

}


/**


Another one close for an ACK - some cleanup here as well as seeing the
fallout from previous patches.

John
>   * virStreamRecvAll:
>   * @stream: pointer to the stream object
>   * @handler: sink callback for writing data to application
> diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
> index 008dc59..ea4ddd5 100644
> --- a/src/libvirt_public.syms
> +++ b/src/libvirt_public.syms
> @@ -765,6 +765,7 @@ LIBVIRT_3.3.0 {
>          virStreamRecvFlags;
>          virStreamSkip;
>          virStreamSparseRecvAll;
> +        virStreamSparseSendAll;
>  } LIBVIRT_3.1.0;
>  
>  # .... define new API here using predicted next version number ....
> 

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH v2 15/38] Introduce virStreamSparseSendAll
Posted by Michal Privoznik 8 years, 9 months ago
On 05/04/2017 11:35 PM, John Ferlan wrote:
> 
> 
> On 04/20/2017 06:01 AM, Michal Privoznik wrote:
>> This is just a wrapper over new function that have been just
>> introduced: virStreamSkip() . It's very similar to
>> virStreamSendAll() except it handles sparse streams well.
> 
> You could have some changes here due to previous API name changes.
> Still the commit msg is a bit terse needs some cleanup anyway.
> 
>>
>> Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
>> ---
>>  include/libvirt/libvirt-stream.h |  59 +++++++++++++++++++--
>>  src/libvirt-stream.c             | 107 +++++++++++++++++++++++++++++++++++++++
>>  src/libvirt_public.syms          |   1 +
>>  3 files changed, 164 insertions(+), 3 deletions(-)
>>


>>  /**
>> + * virStreamSparseSendAll:
>> + * @stream: pointer to the stream object
>> + * @handler: source callback for reading data from application
>> + * @holeHandler: source callback for determining holes
>> + * @skipHandler: skip holes as reported by @holeHandler
>> + * @opaque: application defined data
>> + *
>> + * Some dummy description here.
> 
> Some smart-aleck comment here ;-)

Oh, I thought I've fixed this before sending O:-)

> 
>> + *
>> + * Opaque data in @opaque are shared between @handler, @holeHandler and @skipHandler.
>> + *
>> + * Returns 0 if all the data was successfully sent. The caller
>> + * should invoke virStreamFinish(st) to flush the stream upon
>> + * success and then virStreamFree.
>> + *
>> + * Returns -1 upon any error, with virStreamAbort() already
>> + * having been called,  so the caller need only call
>> + * virStreamFree().
>> + */
>> +int virStreamSparseSendAll(virStreamPtr stream,
>> +                           virStreamSourceFunc handler,
>> +                           virStreamSourceHoleFunc holeHandler,
>> +                           virStreamSourceSkipFunc skipHandler,
>> +                           void *opaque)
>> +{
>> +    char *bytes = NULL;
>> +    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
>> +    int ret = -1;
>> +    unsigned long long dataLen = 0;
>> +
>> +    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
>> +              stream, handler, holeHandler, opaque);
>> +
>> +    virResetLastError();
>> +
>> +    virCheckStreamReturn(stream, -1);
>> +    virCheckNonNullArgGoto(handler, cleanup);
>> +    virCheckNonNullArgGoto(holeHandler, cleanup);
>> +    virCheckNonNullArgGoto(skipHandler, cleanup);
>> +
>> +    if (stream->flags & VIR_STREAM_NONBLOCK) {
>> +        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
>> +                       _("data sources cannot be used for non-blocking streams"));
>> +        goto cleanup;
>> +    }
>> +
>> +    if (VIR_ALLOC_N(bytes, want) < 0)
>> +        goto cleanup;
>> +
>> +    for (;;) {
>> +        int inData, got, offset = 0;
>> +        unsigned long long sectionLen;
>> +
>> +        if (!dataLen) {
>> +            if (holeHandler(stream, &inData, &sectionLen, opaque) < 0) {
>> +                virStreamAbort(stream);
>> +                goto cleanup;
>> +            }
>> +
>> +            if (!inData && sectionLen) {
> 
> So if !inData and sectionLen == 0, we don't go here, but
> 
>> +                if (virStreamSkip(stream, sectionLen) < 0) {
>> +                    virStreamAbort(stream);
>> +                    goto cleanup;
>> +                }
>> +
>> +                if (skipHandler(stream, sectionLen, opaque) < 0) {
>> +                    virReportSystemError(errno, "%s",
>> +                                         _("unable to skip hole"));
>> +                    virStreamAbort(stream);
>> +                    goto cleanup;
>> +                }
>> +                continue;
>> +            } else {
>> +                dataLen = sectionLen;
> 
> We go here indicating 'dataLen = 0' which is a misnomer since we're not
> inData, rather we at EOF, right?  Or did I miss something a bit subtle.

Yeah, we go through another iteration of @handler. We are at EOF so it
doesn't matter. Basically this is for consistence with daemon
counterpart.  NB with current implementation there should never ever be
STREAM_SKIP packet with len = 0. I've wanted pre-existing code to handle
EOFs (i.e. virStreamRecv) instead of sending useless packet every time.
Consider following example:

data -> hole -> data -> eof.

So the sequence of packets sent looks like this then:

STREAM + data -> SKIP -> STREAM + data -> STREAM + no data

Now, the last packet sent is STREAM without any data and header.status =
VIR_NET_OK meaning there are no other packets in the stream. This is the
last one. If we were to treat the implicit hole at eof just like the
regular one STREAM, either there would be dummy SKIP packet in between
the last two STREAM packets, or it could substitute the last STREAM & no
data packet. Either way, we don't need two possible representations of
EOF currently. Lets have just one for now. In order to achieve that we
iterate over the loop once again (despite knowing that we are at EOF)
and have the code send the EOF representation.

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list