[PULL 16/42] tests/functional: Allow asset downloading with concurrent threads

Thomas Huth posted 42 patches 2 months, 2 weeks ago
[PULL 16/42] tests/functional: Allow asset downloading with concurrent threads
Posted by Thomas Huth 2 months, 2 weeks ago
When running "make -j$(nproc) check-functional", tests that use the
same asset might be running in parallel. Improve the downloading to
detect this situation and wait for the other thread to finish the
download.

Message-ID: <20240830133841.142644-17-thuth@redhat.com>
Signed-off-by: Thomas Huth <thuth@redhat.com>
---
 tests/functional/qemu_test/asset.py | 62 ++++++++++++++++++++++++-----
 1 file changed, 51 insertions(+), 11 deletions(-)

diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py
index b329ab7dbe..d3be2aff82 100644
--- a/tests/functional/qemu_test/asset.py
+++ b/tests/functional/qemu_test/asset.py
@@ -12,6 +12,7 @@
 import sys
 import unittest
 import urllib.request
+from time import sleep
 from pathlib import Path
 from shutil import copyfileobj
 
@@ -55,6 +56,35 @@ def _check(self, cache_file):
     def valid(self):
         return self.cache_file.exists() and self._check(self.cache_file)
 
+    def _wait_for_other_download(self, tmp_cache_file):
+        # Another thread already seems to download the asset, so wait until
+        # it is done, while also checking the size to see whether it is stuck
+        try:
+            current_size = tmp_cache_file.stat().st_size
+            new_size = current_size
+        except:
+            if os.path.exists(self.cache_file):
+                return True
+            raise
+        waittime = lastchange = 600
+        while waittime > 0:
+            sleep(1)
+            waittime -= 1
+            try:
+                new_size = tmp_cache_file.stat().st_size
+            except:
+                if os.path.exists(self.cache_file):
+                    return True
+                raise
+            if new_size != current_size:
+                lastchange = waittime
+                current_size = new_size
+            elif lastchange - waittime > 90:
+                return False
+
+        self.log.debug("Time out while waiting for %s!", tmp_cache_file)
+        raise
+
     def fetch(self):
         if not self.cache_dir.exists():
             self.cache_dir.mkdir(parents=True, exist_ok=True)
@@ -70,18 +100,28 @@ def fetch(self):
         self.log.info("Downloading %s to %s...", self.url, self.cache_file)
         tmp_cache_file = self.cache_file.with_suffix(".download")
 
-        try:
-            resp = urllib.request.urlopen(self.url)
-        except Exception as e:
-            self.log.error("Unable to download %s: %s", self.url, e)
-            raise
+        for retries in range(3):
+            try:
+                with tmp_cache_file.open("xb") as dst:
+                    with urllib.request.urlopen(self.url) as resp:
+                        copyfileobj(resp, dst)
+                break
+            except FileExistsError:
+                self.log.debug("%s already exists, "
+                               "waiting for other thread to finish...",
+                               tmp_cache_file)
+                if self._wait_for_other_download(tmp_cache_file):
+                    return str(self.cache_file)
+                self.log.debug("%s seems to be stale, "
+                               "deleting and retrying download...",
+                               tmp_cache_file)
+                tmp_cache_file.unlink()
+                continue
+            except Exception as e:
+                self.log.error("Unable to download %s: %s", self.url, e)
+                tmp_cache_file.unlink()
+                raise
 
-        try:
-            with tmp_cache_file.open("wb+") as dst:
-                copyfileobj(resp, dst)
-        except:
-            tmp_cache_file.unlink()
-            raise
         try:
             # Set these just for informational purposes
             os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",
-- 
2.46.0