[PATCH v4 3/5] ch: events: Read and parse cloud-hypervisor events

Purna Pavan Chandra Aekkaladevi posted 5 patches 2 weeks, 5 days ago
[PATCH v4 3/5] ch: events: Read and parse cloud-hypervisor events
Posted by Purna Pavan Chandra Aekkaladevi 2 weeks, 5 days ago
Implement `chReadProcessEvents` and `chProcessEvents` to read events from
event monitor FIFO file and parse them accordingly.

Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@linux.microsoft.com>
Co-authored-by: Vineeth Pillai <viremana@linux.microsoft.com>
---
 src/ch/ch_events.c  | 137 +++++++++++++++++++++++++++++++++++++++++++-
 src/ch/ch_events.h  |   2 +
 src/ch/ch_monitor.h |   6 ++
 3 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/src/ch/ch_events.c b/src/ch/ch_events.c
index 247f99cef0..a4c2fc4130 100644
--- a/src/ch/ch_events.c
+++ b/src/ch/ch_events.c
@@ -45,6 +45,137 @@ static int virCHEventStopProcess(virDomainObj *vm,
     return 0;
 }
 
+/**
+ * virCHProcessEvents:
+ * @mon: the CH monitor object
+ *
+ * Parse the events from the event buffer and process them
+ * Example event:
+ * {
+ *   "timestamp": {
+ *     "secs": 0,
+ *     "nanos": 29228206
+ *    },
+ *   "source": "vm",
+ *   "event": "booted",
+ *   "properties": null
+ * }
+ *
+ * Returns: 0 on success, -1 on failure
+ */
+static int virCHProcessEvents(virCHMonitor *mon)
+{
+    virDomainObj *vm = mon->vm;
+    char *buf = mon->event_buffer.buffer;
+    ssize_t sz = mon->event_buffer.buf_fill_sz;
+    virJSONValue *obj = NULL;
+    int blocks = 0;
+    size_t i = 0;
+    char *json_start;
+    ssize_t start_index = -1;
+    ssize_t end_index = -1;
+    char tmp;
+    int ret = 0;
+
+    while (i < sz) {
+        if (buf[i] == '{') {
+            blocks++;
+            if (blocks == 1)
+                start_index = i;
+        } else if (buf[i] == '}' && blocks > 0) {
+            blocks--;
+            if (blocks == 0) {
+                /* valid json document */
+                end_index = i;
+
+                /* temporarily null terminate the JSON doc */
+                tmp = buf[end_index + 1];
+                buf[end_index + 1] = '\0';
+                json_start = buf + start_index;
+
+                if ((obj = virJSONValueFromString(json_start))) {
+                    /* Process the event string (obj) here */
+                    virJSONValueFree(obj);
+                } else {
+                    VIR_WARN("%s: Invalid JSON event doc: %s",
+                             vm->def->name, json_start);
+                    ret = -1;
+                }
+
+                /* replace the original character */
+                buf[end_index + 1] = tmp;
+                start_index = -1;
+            }
+        }
+
+        i++;
+    }
+
+    if (start_index == -1) {
+        /* We have processed all the JSON docs in the buffer */
+        mon->event_buffer.buf_fill_sz = 0;
+    } else if (start_index > 0) {
+        /* We have an incomplete JSON doc at the end of the buffer
+         * Move it to the start of the buffer
+         */
+        mon->event_buffer.buf_fill_sz = sz - start_index;
+        memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
+    }
+
+    return ret;
+}
+
+static void virCHReadProcessEvents(virCHMonitor *mon,
+                                   int event_monitor_fd)
+{
+    /* Event json string must always terminate with null char.
+     * So, reserve one byte for '\0' at the end.
+     */
+    size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
+    char *buf = mon->event_buffer.buffer;
+    virDomainObj *vm = mon->vm;
+    bool incomplete = false;
+    size_t sz = 0;
+
+    memset(buf, 0, max_sz);
+    do {
+        ssize_t ret;
+
+        ret = read(event_monitor_fd, buf + sz, max_sz - sz);
+        if (ret == 0 || (ret < 0 && errno == EINTR)) {
+            g_usleep(G_USEC_PER_SEC);
+            continue;
+        } else if (ret < 0) {
+            /* We should never reach here. read(2) says possible errors
+             * are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
+             * We handle EINTR gracefully. There is some serious issue
+             * if we encounter any of the other errors(either in our code
+             * or in the system). Better to bail out.
+             */
+            VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
+                      vm->def->name, g_strerror(errno));
+            virCHEventStopProcess(vm, VIR_DOMAIN_SHUTOFF_FAILED);
+            virCHStopEventHandler(mon);
+            return;
+        }
+
+        sz += ret;
+        mon->event_buffer.buf_fill_sz = sz;
+
+        if (virCHProcessEvents(mon) < 0)
+            VIR_WARN("%s: Failed to parse and process events", vm->def->name);
+
+        if (mon->event_buffer.buf_fill_sz != 0)
+            incomplete = true;
+        else
+            incomplete = false;
+        sz = mon->event_buffer.buf_fill_sz;
+
+    } while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
+
+    return;
+}
+
 static void virCHEventHandlerLoop(void *data)
 {
     virCHMonitor *mon = data;
@@ -56,6 +187,9 @@ static void virCHEventHandlerLoop(void *data)
 
     VIR_DEBUG("%s: Event handler loop thread starting", vm->def->name);
 
+    mon->event_buffer.buffer = g_malloc_n(sizeof(char), CH_EVENT_BUFFER_SZ);
+    mon->event_buffer.buf_fill_sz = 0;
+
     while ((event_monitor_fd = open(mon->eventmonitorpath, O_RDONLY)) < 0) {
         if (errno == EINTR) {
             /* 100 milli seconds */
@@ -74,10 +208,11 @@ static void virCHEventHandlerLoop(void *data)
 
     while (g_atomic_int_get(&mon->event_handler_stop) == 0) {
         VIR_DEBUG("%s: Reading events from event monitor file", vm->def->name);
-        /* Read and process events here */
+        virCHReadProcessEvents(mon, event_monitor_fd);
     }
 
  end:
+    g_clear_pointer(&mon->event_buffer.buffer, g_free);
     VIR_FORCE_CLOSE(event_monitor_fd);
     virObjectUnref(vm);
     VIR_DEBUG("%s: Event handler loop thread exiting", vm->def->name);
diff --git a/src/ch/ch_events.h b/src/ch/ch_events.h
index 4c8a48231d..2e9cdf03bb 100644
--- a/src/ch/ch_events.h
+++ b/src/ch/ch_events.h
@@ -22,5 +22,7 @@
 
 #include "ch_monitor.h"
 
+#define CH_EVENT_BUFFER_SZ  PIPE_BUF
+
 int virCHStartEventHandler(virCHMonitor *mon);
 void virCHStopEventHandler(virCHMonitor *mon);
diff --git a/src/ch/ch_monitor.h b/src/ch/ch_monitor.h
index 6d154652fa..a84e348938 100644
--- a/src/ch/ch_monitor.h
+++ b/src/ch/ch_monitor.h
@@ -99,6 +99,12 @@ struct _virCHMonitor {
 
     virThread event_handler_thread;
     int event_handler_stop;
+    struct {
+        /* Buffer to hold the data read from pipe */
+        char *buffer;
+        /* Size of the data read from pipe into buffer */
+        size_t buf_fill_sz;
+    } event_buffer;
 
     pid_t pid;
 
-- 
2.34.1
Re: [PATCH v4 3/5] ch: events: Read and parse cloud-hypervisor events
Posted by Praveen K Paladugu 1 week, 4 days ago

On 12/2/2024 3:46 AM, Purna Pavan Chandra Aekkaladevi wrote:
> Implement `chReadProcessEvents` and `chProcessEvents` to read events from
> event monitor FIFO file and parse them accordingly.
> 
> Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@linux.microsoft.com>
> Co-authored-by: Vineeth Pillai <viremana@linux.microsoft.com>
> ---
>   src/ch/ch_events.c  | 137 +++++++++++++++++++++++++++++++++++++++++++-
>   src/ch/ch_events.h  |   2 +
>   src/ch/ch_monitor.h |   6 ++
>   3 files changed, 144 insertions(+), 1 deletion(-)
> 
> diff --git a/src/ch/ch_events.c b/src/ch/ch_events.c
> index 247f99cef0..a4c2fc4130 100644
> --- a/src/ch/ch_events.c
> +++ b/src/ch/ch_events.c
> @@ -45,6 +45,137 @@ static int virCHEventStopProcess(virDomainObj *vm,
>       return 0;
>   }
>   
> +/**
> + * virCHProcessEvents:
> + * @mon: the CH monitor object
> + *
> + * Parse the events from the event buffer and process them
> + * Example event:
> + * {
> + *   "timestamp": {
> + *     "secs": 0,
> + *     "nanos": 29228206
> + *    },
> + *   "source": "vm",
> + *   "event": "booted",
> + *   "properties": null
> + * }
> + *
> + * Returns: 0 on success, -1 on failure
> + */
> +static int virCHProcessEvents(virCHMonitor *mon)
> +{
> +    virDomainObj *vm = mon->vm;
> +    char *buf = mon->event_buffer.buffer;
> +    ssize_t sz = mon->event_buffer.buf_fill_sz;
> +    virJSONValue *obj = NULL;
> +    int blocks = 0;
> +    size_t i = 0;
> +    char *json_start;
> +    ssize_t start_index = -1;
> +    ssize_t end_index = -1;
> +    char tmp;
> +    int ret = 0;
> +
> +    while (i < sz) {
> +        if (buf[i] == '{') {
> +            blocks++;
> +            if (blocks == 1)
> +                start_index = i;
> +        } else if (buf[i] == '}' && blocks > 0) {
> +            blocks--;
> +            if (blocks == 0) {
> +                /* valid json document */
> +                end_index = i;
> +
> +                /* temporarily null terminate the JSON doc */
> +                tmp = buf[end_index + 1];
> +                buf[end_index + 1] = '\0';
> +                json_start = buf + start_index;
> +
> +                if ((obj = virJSONValueFromString(json_start))) {
> +                    /* Process the event string (obj) here */
> +                    virJSONValueFree(obj);
> +                } else {
> +                    VIR_WARN("%s: Invalid JSON event doc: %s",
> +                             vm->def->name, json_start);
> +                    ret = -1;
If you do encounter an invalid JSON, this implementation will continue
processing rest of the events. You should return from this method with
-1 here.

> +                }
> +
> +                /* replace the original character */
> +                buf[end_index + 1] = tmp;
> +                start_index = -1;
> +            }
> +        }
> +
> +        i++;
> +    }
> +
> +    if (start_index == -1) {
> +        /* We have processed all the JSON docs in the buffer */
> +        mon->event_buffer.buf_fill_sz = 0;
> +    } else if (start_index > 0) {
> +        /* We have an incomplete JSON doc at the end of the buffer
> +         * Move it to the start of the buffer
> +         */
> +        mon->event_buffer.buf_fill_sz = sz - start_index;
> +        memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
> +    }
> +
> +    return ret;
> +}
> +
> +static void virCHReadProcessEvents(virCHMonitor *mon,
> +                                   int event_monitor_fd)
> +{
> +    /* Event json string must always terminate with null char.
> +     * So, reserve one byte for '\0' at the end.
> +     */
> +    size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
> +    char *buf = mon->event_buffer.buffer;
> +    virDomainObj *vm = mon->vm;
> +    bool incomplete = false;
> +    size_t sz = 0;
> +
> +    memset(buf, 0, max_sz);
> +    do {
> +        ssize_t ret;
> +
> +        ret = read(event_monitor_fd, buf + sz, max_sz - sz);
> +        if (ret == 0 || (ret < 0 && errno == EINTR)) {
> +            g_usleep(G_USEC_PER_SEC);
> +            continue;
> +        } else if (ret < 0) {
> +            /* We should never reach here. read(2) says possible errors
> +             * are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
> +             * We handle EINTR gracefully. There is some serious issue
> +             * if we encounter any of the other errors(either in our code
> +             * or in the system). Better to bail out.
> +             */
> +            VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
> +                      vm->def->name, g_strerror(errno));
> +            virCHEventStopProcess(vm, VIR_DOMAIN_SHUTOFF_FAILED);
> +            virCHStopEventHandler(mon);
> +            return;
> +        }
> +
> +        sz += ret;
> +        mon->event_buffer.buf_fill_sz = sz;
> +
> +        if (virCHProcessEvents(mon) < 0)
> +            VIR_WARN("%s: Failed to parse and process events", vm->def->name);
> +
> +        if (mon->event_buffer.buf_fill_sz != 0)
> +            incomplete = true;
> +        else
> +            incomplete = false;
> +        sz = mon->event_buffer.buf_fill_sz;
> +
> +    } while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
> +
> +    return;
> +}
> +
>   static void virCHEventHandlerLoop(void *data)
>   {
>       virCHMonitor *mon = data;
> @@ -56,6 +187,9 @@ static void virCHEventHandlerLoop(void *data)
>   
>       VIR_DEBUG("%s: Event handler loop thread starting", vm->def->name);
>   
> +    mon->event_buffer.buffer = g_malloc_n(sizeof(char), CH_EVENT_BUFFER_SZ);
> +    mon->event_buffer.buf_fill_sz = 0;
> +
>       while ((event_monitor_fd = open(mon->eventmonitorpath, O_RDONLY)) < 0) {
>           if (errno == EINTR) {
>               /* 100 milli seconds */
> @@ -74,10 +208,11 @@ static void virCHEventHandlerLoop(void *data)
>   
>       while (g_atomic_int_get(&mon->event_handler_stop) == 0) {
>           VIR_DEBUG("%s: Reading events from event monitor file", vm->def->name);
> -        /* Read and process events here */
> +        virCHReadProcessEvents(mon, event_monitor_fd);
>       }
>   
>    end:
> +    g_clear_pointer(&mon->event_buffer.buffer, g_free);
>       VIR_FORCE_CLOSE(event_monitor_fd);
>       virObjectUnref(vm);
>       VIR_DEBUG("%s: Event handler loop thread exiting", vm->def->name);
> diff --git a/src/ch/ch_events.h b/src/ch/ch_events.h
> index 4c8a48231d..2e9cdf03bb 100644
> --- a/src/ch/ch_events.h
> +++ b/src/ch/ch_events.h
> @@ -22,5 +22,7 @@
>   
>   #include "ch_monitor.h"
>   
> +#define CH_EVENT_BUFFER_SZ  PIPE_BUF
> +
>   int virCHStartEventHandler(virCHMonitor *mon);
>   void virCHStopEventHandler(virCHMonitor *mon);
> diff --git a/src/ch/ch_monitor.h b/src/ch/ch_monitor.h
> index 6d154652fa..a84e348938 100644
> --- a/src/ch/ch_monitor.h
> +++ b/src/ch/ch_monitor.h
> @@ -99,6 +99,12 @@ struct _virCHMonitor {
>   
>       virThread event_handler_thread;
>       int event_handler_stop;
> +    struct {
> +        /* Buffer to hold the data read from pipe */
> +        char *buffer;
> +        /* Size of the data read from pipe into buffer */
> +        size_t buf_fill_sz;
> +    } event_buffer;
>   
>       pid_t pid;
>   

-- 
Regards,
Praveen K Paladugu
Re: [PATCH v4 3/5] ch: events: Read and parse cloud-hypervisor events
Posted by Purna Pavan Chandra Aekkaladevi 1 week, 3 days ago
On Tue, Dec 10, 2024 at 10:28:41AM -0600, Praveen K Paladugu wrote:
> 
> 
> On 12/2/2024 3:46 AM, Purna Pavan Chandra Aekkaladevi wrote:
> >Implement `chReadProcessEvents` and `chProcessEvents` to read events from
> >event monitor FIFO file and parse them accordingly.
> >
> >Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@linux.microsoft.com>
> >Co-authored-by: Vineeth Pillai <viremana@linux.microsoft.com>
> >---
> >  src/ch/ch_events.c  | 137 +++++++++++++++++++++++++++++++++++++++++++-
> >  src/ch/ch_events.h  |   2 +
> >  src/ch/ch_monitor.h |   6 ++
> >  3 files changed, 144 insertions(+), 1 deletion(-)
> >
> >diff --git a/src/ch/ch_events.c b/src/ch/ch_events.c
> >index 247f99cef0..a4c2fc4130 100644
> >--- a/src/ch/ch_events.c
> >+++ b/src/ch/ch_events.c
> >@@ -45,6 +45,137 @@ static int virCHEventStopProcess(virDomainObj *vm,
> >      return 0;
> >  }
> >+static int virCHProcessEvents(virCHMonitor *mon)
> >+{
> >+    virDomainObj *vm = mon->vm;
> >+    char *buf = mon->event_buffer.buffer;
> >+    ssize_t sz = mon->event_buffer.buf_fill_sz;
> >+    virJSONValue *obj = NULL;
> >+    int blocks = 0;
> >+    size_t i = 0;
> >+    char *json_start;
> >+    ssize_t start_index = -1;
> >+    ssize_t end_index = -1;
> >+    char tmp;
> >+    int ret = 0;
> >+
> >+    while (i < sz) {
> >+        if (buf[i] == '{') {
> >+            blocks++;
> >+            if (blocks == 1)
> >+                start_index = i;
> >+        } else if (buf[i] == '}' && blocks > 0) {
> >+            blocks--;
> >+            if (blocks == 0) {
> >+                /* valid json document */
> >+                end_index = i;
> >+
> >+                /* temporarily null terminate the JSON doc */
> >+                tmp = buf[end_index + 1];
> >+                buf[end_index + 1] = '\0';
> >+                json_start = buf + start_index;
> >+
> >+                if ((obj = virJSONValueFromString(json_start))) {
> >+                    /* Process the event string (obj) here */
> >+                    virJSONValueFree(obj);
> >+                } else {
> >+                    VIR_WARN("%s: Invalid JSON event doc: %s",
> >+                             vm->def->name, json_start);
> >+                    ret = -1;
> If you do encounter an invalid JSON, this implementation will continue
> processing rest of the events. You should return from this method with
> -1 here.
> 

Returning -1 here does not halt the thread. So, even if we were to
return -1 here (after memmove-ing the rest of the json to the beginning
of buf, just as done in below 'else if' block), it essentially tries to
continue processing the rest of the events.

> >+                }
> >+
> >+                /* replace the original character */
> >+                buf[end_index + 1] = tmp;
> >+                start_index = -1;
> >+            }
> >+        }
> >+
> >+        i++;
> >+    }
> >+
> >+    if (start_index == -1) {
> >+        /* We have processed all the JSON docs in the buffer */
> >+        mon->event_buffer.buf_fill_sz = 0;
> >+    } else if (start_index > 0) {
> >+        /* We have an incomplete JSON doc at the end of the buffer
> >+         * Move it to the start of the buffer
> >+         */
> >+        mon->event_buffer.buf_fill_sz = sz - start_index;
> >+        memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
> >+    }
> >+
> >+    return ret;
> >+}
> >+
> >+static void virCHReadProcessEvents(virCHMonitor *mon,
> >+                                   int event_monitor_fd)
> >+{
> >+    /* Event json string must always terminate with null char.
> >+     * So, reserve one byte for '\0' at the end.
> >+     */
> >+    size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
> >+    char *buf = mon->event_buffer.buffer;
> >+    virDomainObj *vm = mon->vm;
> >+    bool incomplete = false;
> >+    size_t sz = 0;
> >+
> >+    memset(buf, 0, max_sz);
> >+    do {
> >+        ssize_t ret;
> >+
> >+        ret = read(event_monitor_fd, buf + sz, max_sz - sz);
> >+        if (ret == 0 || (ret < 0 && errno == EINTR)) {
> >+            g_usleep(G_USEC_PER_SEC);
> >+            continue;
> >+        } else if (ret < 0) {
> >+            /* We should never reach here. read(2) says possible errors
> >+             * are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
> >+             * We handle EINTR gracefully. There is some serious issue
> >+             * if we encounter any of the other errors(either in our code
> >+             * or in the system). Better to bail out.
> >+             */
> >+            VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
> >+                      vm->def->name, g_strerror(errno));
> >+            virCHEventStopProcess(vm, VIR_DOMAIN_SHUTOFF_FAILED);
> >+            virCHStopEventHandler(mon);
> >+            return;
> >+        }
> >+
> >+        sz += ret;
> >+        mon->event_buffer.buf_fill_sz = sz;
> >+
> >+        if (virCHProcessEvents(mon) < 0)
> >+            VIR_WARN("%s: Failed to parse and process events", vm->def->name);
> >+
> >+        if (mon->event_buffer.buf_fill_sz != 0)
> >+            incomplete = true;
> >+        else
> >+            incomplete = false;
> >+        sz = mon->event_buffer.buf_fill_sz;
> >+
> >+    } while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
> >+
> >+    return;
> >+}
> >+
> 
> -- 
> Regards,
> Praveen K Paladugu

Regards,
Purna Pavan Chandra
Re: [PATCH v4 3/5] ch: events: Read and parse cloud-hypervisor events
Posted by Praveen K Paladugu 1 week, 3 days ago

On 12/11/2024 1:10 AM, Purna Pavan Chandra Aekkaladevi wrote:
> On Tue, Dec 10, 2024 at 10:28:41AM -0600, Praveen K Paladugu wrote:
>>
>>
>> On 12/2/2024 3:46 AM, Purna Pavan Chandra Aekkaladevi wrote:
>>> Implement `chReadProcessEvents` and `chProcessEvents` to read events from
>>> event monitor FIFO file and parse them accordingly.
>>>
>>> Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@linux.microsoft.com>
>>> Co-authored-by: Vineeth Pillai <viremana@linux.microsoft.com>
>>> ---
>>>   src/ch/ch_events.c  | 137 +++++++++++++++++++++++++++++++++++++++++++-
>>>   src/ch/ch_events.h  |   2 +
>>>   src/ch/ch_monitor.h |   6 ++
>>>   3 files changed, 144 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/src/ch/ch_events.c b/src/ch/ch_events.c
>>> index 247f99cef0..a4c2fc4130 100644
>>> --- a/src/ch/ch_events.c
>>> +++ b/src/ch/ch_events.c
>>> @@ -45,6 +45,137 @@ static int virCHEventStopProcess(virDomainObj *vm,
>>>       return 0;
>>>   }
>>> +static int virCHProcessEvents(virCHMonitor *mon)
>>> +{
>>> +    virDomainObj *vm = mon->vm;
>>> +    char *buf = mon->event_buffer.buffer;
>>> +    ssize_t sz = mon->event_buffer.buf_fill_sz;
>>> +    virJSONValue *obj = NULL;
>>> +    int blocks = 0;
>>> +    size_t i = 0;
>>> +    char *json_start;
>>> +    ssize_t start_index = -1;
>>> +    ssize_t end_index = -1;
>>> +    char tmp;
>>> +    int ret = 0;
>>> +
>>> +    while (i < sz) {
>>> +        if (buf[i] == '{') {
>>> +            blocks++;
>>> +            if (blocks == 1)
>>> +                start_index = i;
>>> +        } else if (buf[i] == '}' && blocks > 0) {
>>> +            blocks--;
>>> +            if (blocks == 0) {
>>> +                /* valid json document */
>>> +                end_index = i;
>>> +
>>> +                /* temporarily null terminate the JSON doc */
>>> +                tmp = buf[end_index + 1];
>>> +                buf[end_index + 1] = '\0';
>>> +                json_start = buf + start_index;
>>> +
>>> +                if ((obj = virJSONValueFromString(json_start))) {
>>> +                    /* Process the event string (obj) here */
>>> +                    virJSONValueFree(obj);
>>> +                } else {
>>> +                    VIR_WARN("%s: Invalid JSON event doc: %s",
>>> +                             vm->def->name, json_start);
>>> +                    ret = -1;
>> If you do encounter an invalid JSON, this implementation will continue
>> processing rest of the events. You should return from this method with
>> -1 here.
>>
> 
> Returning -1 here does not halt the thread. So, even if we were to
> return -1 here (after memmove-ing the rest of the json to the beginning
> of buf, just as done in below 'else if' block), it essentially tries to
> continue processing the rest of the events.

That is correct. I should have been clearer.
If aJ JSON event received here is invalid, it is very likely an issue
with the transport. In such a case, following events could also be
corrupt. I don't see much value in processing remaining events in
such a situation.

That is why I am suggesting that you return from this method and stop 
processing further events.

>>> +                }
>>> +
>>> +                /* replace the original character */
>>> +                buf[end_index + 1] = tmp;
>>> +                start_index = -1;
>>> +            }
>>> +        }
>>> +
>>> +        i++;
>>> +    }
>>> +
>>> +    if (start_index == -1) {
>>> +        /* We have processed all the JSON docs in the buffer */
>>> +        mon->event_buffer.buf_fill_sz = 0;
>>> +    } else if (start_index > 0) {
>>> +        /* We have an incomplete JSON doc at the end of the buffer
>>> +         * Move it to the start of the buffer
>>> +         */
>>> +        mon->event_buffer.buf_fill_sz = sz - start_index;
>>> +        memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
>>> +    }
>>> +
>>> +    return ret;
>>> +}
>>> +
>>> +static void virCHReadProcessEvents(virCHMonitor *mon,
>>> +                                   int event_monitor_fd)
>>> +{
>>> +    /* Event json string must always terminate with null char.
>>> +     * So, reserve one byte for '\0' at the end.
>>> +     */
>>> +    size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
>>> +    char *buf = mon->event_buffer.buffer;
>>> +    virDomainObj *vm = mon->vm;
>>> +    bool incomplete = false;
>>> +    size_t sz = 0;
>>> +
>>> +    memset(buf, 0, max_sz);
>>> +    do {
>>> +        ssize_t ret;
>>> +
>>> +        ret = read(event_monitor_fd, buf + sz, max_sz - sz);
>>> +        if (ret == 0 || (ret < 0 && errno == EINTR)) {
>>> +            g_usleep(G_USEC_PER_SEC);
>>> +            continue;
>>> +        } else if (ret < 0) {
>>> +            /* We should never reach here. read(2) says possible errors
>>> +             * are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
>>> +             * We handle EINTR gracefully. There is some serious issue
>>> +             * if we encounter any of the other errors(either in our code
>>> +             * or in the system). Better to bail out.
>>> +             */
>>> +            VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
>>> +                      vm->def->name, g_strerror(errno));
>>> +            virCHEventStopProcess(vm, VIR_DOMAIN_SHUTOFF_FAILED);
>>> +            virCHStopEventHandler(mon);
>>> +            return;
>>> +        }
>>> +
>>> +        sz += ret;
>>> +        mon->event_buffer.buf_fill_sz = sz;
>>> +
>>> +        if (virCHProcessEvents(mon) < 0)
>>> +            VIR_WARN("%s: Failed to parse and process events", vm->def->name);
>>> +
>>> +        if (mon->event_buffer.buf_fill_sz != 0)
>>> +            incomplete = true;
>>> +        else
>>> +            incomplete = false;
>>> +        sz = mon->event_buffer.buf_fill_sz;
>>> +
>>> +    } while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
>>> +
>>> +    return;
>>> +}
>>> +
>>
>> -- 
>> Regards,
>> Praveen K Paladugu
> 
> Regards,
> Purna Pavan Chandra

-- 
Regards,
Praveen K Paladugu