[PATCH 11/11] scripts/ghes_inject: add a script to generate GHES error inject

Mauro Carvalho Chehab posted 11 patches 11 months ago
There is a newer version of this series
[PATCH 11/11] scripts/ghes_inject: add a script to generate GHES error inject
Posted by Mauro Carvalho Chehab 11 months ago
Using the QMP GHESv2 API requires preparing a raw data array
containing a CPER record.

Add a helper script with subcommands to prepare such data.

Currently, only ARM Processor error CPER record is supported.

Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
---
 MAINTAINERS                    |   3 +
 scripts/arm_processor_error.py | 377 ++++++++++++++++++
 scripts/ghes_inject.py         |  51 +++
 scripts/qmp_helper.py          | 702 +++++++++++++++++++++++++++++++++
 4 files changed, 1133 insertions(+)
 create mode 100644 scripts/arm_processor_error.py
 create mode 100755 scripts/ghes_inject.py
 create mode 100644 scripts/qmp_helper.py

diff --git a/MAINTAINERS b/MAINTAINERS
index 8e1f662fa0e0..99a9ba5c2ace 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2081,6 +2081,9 @@ S: Maintained
 F: hw/arm/ghes_cper.c
 F: hw/acpi/ghes_cper_stub.c
 F: qapi/acpi-hest.json
+F: scripts/ghes_inject.py
+F: scripts/arm_processor_error.py
+F: scripts/qmp_helper.py
 
 ppc4xx
 L: qemu-ppc@nongnu.org
diff --git a/scripts/arm_processor_error.py b/scripts/arm_processor_error.py
new file mode 100644
index 000000000000..62e0c5662232
--- /dev/null
+++ b/scripts/arm_processor_error.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+#
+# pylint: disable=C0301,C0114,R0903,R0912,R0913,R0914,R0915,W0511
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+# TODO: current implementation has dummy defaults.
+#
+# For a better implementation, a QMP addition/call is needed to
+# retrieve some data for ARM Processor Error injection:
+#
+#   - ARM registers: power_state, mpidr.
+
+import argparse
+import re
+
+from qmp_helper import qmp, util, cper_guid
+
+class ArmProcessorEinj:
+    """
+    Implements ARM Processor Error injection via GHES
+    """
+
+    DESC = """
+    Generates an ARM processor error CPER, compatible with
+    UEFI 2.9A Errata.
+    """
+
+    ACPI_GHES_ARM_CPER_LENGTH = 40
+    ACPI_GHES_ARM_CPER_PEI_LENGTH = 32
+
+    # Context types
+    CONTEXT_AARCH32_EL1 = 1
+    CONTEXT_AARCH64_EL1 = 5
+    CONTEXT_MISC_REG = 8
+
+    def __init__(self, subparsers):
+        """Initialize the error injection class and add subparser"""
+
+        # Valid choice values
+        self.arm_valid_bits = {
+            "mpidr":    util.bit(0),
+            "affinity": util.bit(1),
+            "running":  util.bit(2),
+            "vendor":   util.bit(3),
+        }
+
+        self.pei_flags = {
+            "first":        util.bit(0),
+            "last":         util.bit(1),
+            "propagated":   util.bit(2),
+            "overflow":     util.bit(3),
+        }
+
+        self.pei_error_types = {
+            "cache":        util.bit(1),
+            "tlb":          util.bit(2),
+            "bus":          util.bit(3),
+            "micro-arch":   util.bit(4),
+        }
+
+        self.pei_valid_bits = {
+            "multiple-error":   util.bit(0),
+            "flags":            util.bit(1),
+            "error-info":       util.bit(2),
+            "virt-addr":        util.bit(3),
+            "phy-addr":         util.bit(4),
+        }
+
+        self.data = bytearray()
+
+        parser = subparsers.add_parser("arm", description=self.DESC)
+
+        arm_valid_bits = ",".join(self.arm_valid_bits.keys())
+        flags = ",".join(self.pei_flags.keys())
+        error_types = ",".join(self.pei_error_types.keys())
+        pei_valid_bits = ",".join(self.pei_valid_bits.keys())
+
+        # UEFI N.16 ARM Validation bits
+        g_arm = parser.add_argument_group("ARM processor")
+        g_arm.add_argument("--arm", "--arm-valid",
+                           help=f"ARM valid bits: {arm_valid_bits}")
+        g_arm.add_argument("-a", "--affinity",  "--level", "--affinity-level",
+                           type=lambda x: int(x, 0),
+                           help="Affinity level (when multiple levels apply)")
+        g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0),
+                           help="Multiprocessor Affinity Register")
+        g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0),
+                           help="Main ID Register")
+        g_arm.add_argument("-r", "--running",
+                           action=argparse.BooleanOptionalAction,
+                           default=None,
+                           help="Indicates if the processor is running or not")
+        g_arm.add_argument("--psci", "--psci-state",
+                           type=lambda x: int(x, 0),
+                           help="Power State Coordination Interface - PSCI state")
+
+        # TODO: Add vendor-specific support
+
+        # UEFI N.17 bitmaps (type and flags)
+        g_pei = parser.add_argument_group("ARM Processor Error Info (PEI)")
+        g_pei.add_argument("-t", "--type", nargs="+",
+                        help=f"one or more error types: {error_types}")
+        g_pei.add_argument("-f", "--flags", nargs="*",
+                        help=f"zero or more error flags: {flags}")
+        g_pei.add_argument("-V", "--pei-valid", "--error-valid", nargs="*",
+                        help=f"zero or more PEI valid bits: {pei_valid_bits}")
+
+        # UEFI N.17 Integer values
+        g_pei.add_argument("-m", "--multiple-error", nargs="+",
+                        help="Number of errors: 0: Single error, 1: Multiple errors, 2-65535: Error count if known")
+        g_pei.add_argument("-e", "--error-info", nargs="+",
+                        help="Error information (UEFI 2.10 tables N.18 to N.20)")
+        g_pei.add_argument("-p", "--physical-address",  nargs="+",
+                        help="Physical address")
+        g_pei.add_argument("-v", "--virtual-address",  nargs="+",
+                        help="Virtual address")
+
+        # UEFI N.21 Context
+        g_ctx = parser.add_argument_group("Processor Context")
+        g_ctx.add_argument("--ctx-type", "--context-type", nargs="*",
+                        help="Type of the context (0=ARM32 GPR, 5=ARM64 EL1, other values supported)")
+        g_ctx.add_argument("--ctx-size", "--context-size", nargs="*",
+                        help="Minimal size of the context")
+        g_ctx.add_argument("--ctx-array", "--context-array", nargs="*",
+                        help="Comma-separated arrays for each context")
+
+        # Vendor-specific data
+        g_vendor = parser.add_argument_group("Vendor-specific data")
+        g_vendor.add_argument("--vendor", "--vendor-specific", nargs="+",
+                        help="Vendor-specific byte arrays of data")
+
+        # Add arguments for Generic Error Data
+        qmp.argparse(parser)
+
+        parser.set_defaults(func=self.send_cper)
+
+    def send_cper(self, args):
+        """Parse subcommand arguments and send a CPER via QMP"""
+
+        qmp_cmd = qmp(args.host, args.port, args.debug)
+
+        # Handle Generic Error Data arguments if any
+        qmp_cmd.set_args(args)
+
+        is_cpu_type = re.compile(r"^([\w+]+\-)?arm\-cpu$")
+        cpus = qmp_cmd.search_qom("/machine/unattached/device",
+                                  "type", is_cpu_type)
+
+        cper = {}
+        pei = {}
+        ctx = {}
+        vendor = {}
+
+        arg = vars(args)
+
+        # Handle global parameters
+        if args.arm:
+            arm_valid_init = False
+            cper["valid"] = util.get_choice(name="valid",
+                                       value=args.arm,
+                                       choices=self.arm_valid_bits,
+                                       suffixes=["-error", "-err"])
+        else:
+            cper["valid"] = 0
+            arm_valid_init = True
+
+        if "running" in arg:
+            if args.running:
+                cper["running-state"] = util.bit(0)
+            else:
+                cper["running-state"] = 0
+        else:
+            cper["running-state"] = 0
+
+        if arm_valid_init:
+            if args.affinity:
+                cper["valid"] |= self.arm_valid_bits["affinity"]
+
+            if args.mpidr:
+                cper["valid"] |= self.arm_valid_bits["mpidr"]
+
+            if "running-state" in cper:
+                cper["valid"] |= self.arm_valid_bits["running"]
+
+            if args.psci:
+                cper["valid"] |= self.arm_valid_bits["running"]
+
+        # Handle PEI
+        if not args.type:
+            args.type = ["cache-error"]
+
+        util.get_mult_choices(
+            pei,
+            name="valid",
+            values=args.pei_valid,
+            choices=self.pei_valid_bits,
+            suffixes=["-valid", "--addr"],
+        )
+        util.get_mult_choices(
+            pei,
+            name="type",
+            values=args.type,
+            choices=self.pei_error_types,
+            suffixes=["-error", "-err"],
+        )
+        util.get_mult_choices(
+            pei,
+            name="flags",
+            values=args.flags,
+            choices=self.pei_flags,
+            suffixes=["-error", "-cap"],
+        )
+        util.get_mult_int(pei, "error-info", args.error_info)
+        util.get_mult_int(pei, "multiple-error", args.multiple_error)
+        util.get_mult_int(pei, "phy-addr", args.physical_address)
+        util.get_mult_int(pei, "virt-addr", args.virtual_address)
+
+        # Handle context
+        util.get_mult_int(ctx, "type", args.ctx_type, allow_zero=True)
+        util.get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True)
+        util.get_mult_array(ctx, "register", args.ctx_array, allow_zero=True)
+
+        util.get_mult_array(vendor, "bytes", args.vendor, max_val=255)
+
+        # Store PEI
+        pei_data = bytearray()
+        default_flags  = self.pei_flags["first"]
+        default_flags |= self.pei_flags["last"]
+
+        error_info_num = 0
+
+        for i, p in pei.items():        # pylint: disable=W0612
+            error_info_num += 1
+
+            # UEFI 2.10 doesn't define how to encode error information
+            # when multiple types are raised. So, provide a default only
+            # if a single type is there
+            if "error-info" not in p:
+                if p["type"] == util.bit(1):
+                    p["error-info"] = 0x0091000F
+                if p["type"] == util.bit(2):
+                    p["error-info"] = 0x0054007F
+                if p["type"] == util.bit(3):
+                    p["error-info"] = 0x80D6460FFF
+                if p["type"] == util.bit(4):
+                    p["error-info"] = 0x78DA03FF
+
+            if "valid" not in p:
+                p["valid"] = 0
+                if "multiple-error" in p:
+                    p["valid"] |= self.pei_valid_bits["multiple-error"]
+
+                if "flags" in p:
+                    p["valid"] |= self.pei_valid_bits["flags"]
+
+                if "error-info" in p:
+                    p["valid"] |= self.pei_valid_bits["error-info"]
+
+                if "phy-addr" in p:
+                    p["valid"] |= self.pei_valid_bits["phy-addr"]
+
+                if "virt-addr" in p:
+                    p["valid"] |= self.pei_valid_bits["virt-addr"]
+
+            # Version
+            util.data_add(pei_data, 0, 1)
+
+            util.data_add(pei_data,
+                         self.ACPI_GHES_ARM_CPER_PEI_LENGTH, 1)
+
+            util.data_add(pei_data, p["valid"], 2)
+            util.data_add(pei_data, p["type"], 1)
+            util.data_add(pei_data, p.get("multiple-error", 1), 2)
+            util.data_add(pei_data, p.get("flags", default_flags), 1)
+            util.data_add(pei_data, p.get("error-info", 0), 8)
+            util.data_add(pei_data, p.get("virt-addr", 0xDEADBEEF), 8)
+            util.data_add(pei_data, p.get("phy-addr", 0xABBA0BAD), 8)
+
+        # Store Context
+        ctx_data = bytearray()
+        context_info_num = 0
+
+        if ctx:
+            ret = qmp_cmd.send_cmd("query-target", may_open=True)
+
+            default_ctx = self.CONTEXT_MISC_REG
+
+            if "arch" in ret:
+                if ret["arch"] == "aarch64":
+                    default_ctx = self.CONTEXT_AARCH64_EL1
+                elif ret["arch"] == "arm":
+                    default_ctx = self.CONTEXT_AARCH32_EL1
+
+            for k in sorted(ctx.keys()):
+                context_info_num += 1
+
+                if "type" not in ctx[k]:
+                    ctx[k]["type"] = default_ctx
+
+                if "register" not in ctx[k]:
+                    ctx[k]["register"] = []
+
+                reg_size = len(ctx[k]["register"])
+                size = 0
+
+                if "minimal-size" in ctx:
+                    size = ctx[k]["minimal-size"]
+
+                size = max(size, reg_size)
+
+                size = (size + 1) % 0xFFFE
+
+                # Version
+                util.data_add(ctx_data, 0, 2)
+
+                util.data_add(ctx_data, ctx[k]["type"], 2)
+
+                util.data_add(ctx_data, 8 * size, 4)
+
+                for r in ctx[k]["register"]:
+                    util.data_add(ctx_data, r, 8)
+
+                for i in range(reg_size, size):   # pylint: disable=W0612
+                    util.data_add(ctx_data, 0, 8)
+
+        # Vendor-specific bytes are not grouped
+        vendor_data = bytearray()
+        if vendor:
+            for k in sorted(vendor.keys()):
+                for b in vendor[k]["bytes"]:
+                    util.data_add(vendor_data, b, 1)
+
+        # Encode ARM Processor Error
+        data = bytearray()
+
+        util.data_add(data, cper["valid"], 4)
+
+        util.data_add(data, error_info_num, 2)
+        util.data_add(data, context_info_num, 2)
+
+        # Calculate the length of the CPER data
+        cper_length = self.ACPI_GHES_ARM_CPER_LENGTH
+        cper_length += len(pei_data)
+        cper_length += len(vendor_data)
+        cper_length += len(ctx_data)
+        util.data_add(data, cper_length, 4)
+
+        util.data_add(data, arg.get("affinity-level", 0), 1)
+
+        # Reserved
+        util.data_add(data, 0, 3)
+
+        if "midr-el1" not in arg:
+            if cpus:
+                cmd_arg = {
+                    'path': cpus[0],
+                    'property': "midr"
+                }
+                ret = qmp_cmd.send_cmd("qom-get", cmd_arg, may_open=True)
+                if isinstance(ret, int):
+                    arg["midr-el1"] = ret
+
+        util.data_add(data, arg.get("mpidr-el1", 0), 8)
+        util.data_add(data, arg.get("midr-el1", 0), 8)
+        util.data_add(data, cper["running-state"], 4)
+        util.data_add(data, arg.get("psci-state", 0), 4)
+
+        # Add PEI
+        data.extend(pei_data)
+        data.extend(ctx_data)
+        data.extend(vendor_data)
+
+        self.data = data
+
+        qmp_cmd.send_cper(cper_guid.CPER_PROC_ARM, self.data)
diff --git a/scripts/ghes_inject.py b/scripts/ghes_inject.py
new file mode 100755
index 000000000000..67cb6077bec8
--- /dev/null
+++ b/scripts/ghes_inject.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+"""
+Handle ACPI GHESv2 error injection logic QEMU QMP interface.
+"""
+
+import argparse
+import sys
+
+from arm_processor_error import ArmProcessorEinj
+
+EINJ_DESC = """
+Handle ACPI GHESv2 error injection logic QEMU QMP interface.
+
+It allows using UEFI BIOS EINJ features to generate GHES records.
+
+It helps testing CPER and GHES drivers at the guest OS and how
+userspace applications at the guest handle them.
+"""
+
+def main():
+    """Main program"""
+
+    # Main parser - handle generic args like QEMU QMP TCP socket options
+    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+                                     usage="%(prog)s [options]",
+                                     description=EINJ_DESC)
+
+    g_options = parser.add_argument_group("QEMU QMP socket options")
+    g_options.add_argument("-H", "--host", default="localhost", type=str,
+                           help="host name")
+    g_options.add_argument("-P", "--port", default=4445, type=int,
+                           help="TCP port number")
+    g_options.add_argument('-d', '--debug', action='store_true')
+
+    subparsers = parser.add_subparsers()
+
+    ArmProcessorEinj(subparsers)
+
+    args = parser.parse_args()
+    if "func" in args:
+        args.func(args)
+    else:
+        sys.exit(f"Please specify a valid command for {sys.argv[0]}")
+
+if __name__ == "__main__":
+    main()
diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
new file mode 100644
index 000000000000..357ebc6e8359
--- /dev/null
+++ b/scripts/qmp_helper.py
@@ -0,0 +1,702 @@
+#!/usr/bin/env python3
+#
+# # pylint: disable=C0103,E0213,E1135,E1136,E1137,R0902,R0903,R0912,R0913
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+"""
+Helper classes to be used by ghes_inject command classes.
+"""
+
+import json
+import sys
+
+from datetime import datetime
+from os import path as os_path
+
+try:
+    qemu_dir = os_path.abspath(os_path.dirname(os_path.dirname(__file__)))
+    sys.path.append(os_path.join(qemu_dir, 'python'))
+
+    from qemu.qmp.legacy import QEMUMonitorProtocol
+
+except ModuleNotFoundError as exc:
+    print(f"Module '{exc.name}' not found.")
+    print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir")
+    sys.exit(1)
+
+from base64 import b64encode
+
+class util:
+    """
+    Ancillary functions to deal with bitmaps, parse arguments,
+    generate GUID and encode data on a bytearray buffer.
+    """
+
+    #
+    # Helper routines to handle multiple choice arguments
+    #
+    def get_choice(name, value, choices, suffixes=None, bitmask=True):
+        """Produce a list from multiple choice argument"""
+
+        new_values = 0
+
+        if not value:
+            return new_values
+
+        for val in value.split(","):
+            val = val.lower()
+
+            if suffixes:
+                for suffix in suffixes:
+                    val = val.removesuffix(suffix)
+
+            if val not in choices.keys():
+                if suffixes:
+                    for suffix in suffixes:
+                        if val + suffix in choices.keys():
+                            val += suffix
+                            break
+
+            if val not in choices.keys():
+                sys.exit(f"Error on '{name}': choice '{val}' is invalid.")
+
+            val = choices[val]
+
+            if bitmask:
+                new_values |= val
+            else:
+                if new_values:
+                    sys.exit(f"Error on '{name}': only one value is accepted.")
+
+                new_values = val
+
+        return new_values
+
+    def get_array(name, values, max_val=None):
+        """Add numbered hashes from integer lists into an array"""
+
+        array = []
+
+        for value in values:
+            for val in value.split(","):
+                try:
+                    val = int(val, 0)
+                except ValueError:
+                    sys.exit(f"Error on '{name}': {val} is not an integer")
+
+                if val < 0:
+                    sys.exit(f"Error on '{name}': {val} is not unsigned")
+
+                if max_val and val > max_val:
+                    sys.exit(f"Error on '{name}': {val} is too little")
+
+                array.append(val)
+
+        return array
+
+    def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
+        """Add numbered hashes from integer lists"""
+
+        if not allow_zero:
+            if not values:
+                return
+        else:
+            if values is None:
+                return
+
+            if not values:
+                i = 0
+                if i not in mult:
+                    mult[i] = {}
+
+                mult[i][name] = []
+                return
+
+        i = 0
+        for value in values:
+            for val in value.split(","):
+                try:
+                    val = int(val, 0)
+                except ValueError:
+                    sys.exit(f"Error on '{name}': {val} is not an integer")
+
+                if val < 0:
+                    sys.exit(f"Error on '{name}': {val} is not unsigned")
+
+                if max_val and val > max_val:
+                    sys.exit(f"Error on '{name}': {val} is too little")
+
+                if i not in mult:
+                    mult[i] = {}
+
+                if name not in mult[i]:
+                    mult[i][name] = []
+
+                mult[i][name].append(val)
+
+            i += 1
+
+
+    def get_mult_choices(mult, name, values, choices,
+                        suffixes=None, allow_zero=False):
+        """Add numbered hashes from multiple choice arguments"""
+
+        if not allow_zero:
+            if not values:
+                return
+        else:
+            if values is None:
+                return
+
+        i = 0
+        for val in values:
+            new_values = util.get_choice(name, val, choices, suffixes)
+
+            if i not in mult:
+                mult[i] = {}
+
+            mult[i][name] = new_values
+            i += 1
+
+
+    def get_mult_int(mult, name, values, allow_zero=False):
+        """Add numbered hashes from integer arguments"""
+        if not allow_zero:
+            if not values:
+                return
+        else:
+            if values is None:
+                return
+
+        i = 0
+        for val in values:
+            try:
+                val = int(val, 0)
+            except ValueError:
+                sys.exit(f"Error on '{name}': {val} is not an integer")
+
+            if val < 0:
+                sys.exit(f"Error on '{name}': {val} is not unsigned")
+
+            if i not in mult:
+                mult[i] = {}
+
+            mult[i][name] = val
+            i += 1
+
+
+    #
+    # Data encode helper functions
+    #
+    def bit(b):
+        """Simple macro to define a bit on a bitmask"""
+        return 1 << b
+
+
+    def data_add(data, value, num_bytes):
+        """Adds bytes from value inside a bitarray"""
+
+        data.extend(value.to_bytes(num_bytes, byteorder="little"))  # pylint: disable=E1101
+
+    def dump_bytearray(name, data):
+        """Does an hexdump of a byte array, grouping in bytes"""
+
+        print(f"{name} ({len(data)} bytes):")
+
+        for ln_start in range(0, len(data), 16):
+            ln_end = min(ln_start + 16, len(data))
+            print(f"      {ln_start:08x}  ", end="")
+            for i in range(ln_start, ln_end):
+                print(f"{data[i]:02x} ", end="")
+            for i in range(ln_end, ln_start + 16):
+                print("   ", end="")
+            print("  ", end="")
+            for i in range(ln_start, ln_end):
+                if data[i] >= 32 and data[i] < 127:
+                    print(chr(data[i]), end="")
+                else:
+                    print(".", end="")
+
+            print()
+        print()
+
+    def time(string):
+        """Handle BCD timestamps used on Generic Error Data Block"""
+
+        time = None
+
+        # Formats to be used when parsing time stamps
+        formats = [
+            "%Y-%m-%d %H:%M:%S",
+        ]
+
+        if string == "now":
+            time = datetime.now()
+
+        if time is None:
+            for fmt in formats:
+                try:
+                    time = datetime.strptime(string, fmt)
+                    break
+                except ValueError:
+                    pass
+
+            if time is None:
+                raise ValueError("Invalid time format")
+
+        return time
+
+class guid:
+    """
+    Simple class to handle GUID fields.
+    """
+
+    def __init__(self, time_low, time_mid, time_high, nodes):
+        """Initialize a GUID value"""
+
+        assert len(nodes) == 8
+
+        self.time_low = time_low
+        self.time_mid = time_mid
+        self.time_high = time_high
+        self.nodes = nodes
+
+    @classmethod
+    def UUID(cls, guid_str):
+        """Initialize a GUID using a string on its standard format"""
+
+        if len(guid_str) != 36:
+            print("Size not 36")
+            raise ValueError('Invalid GUID size')
+
+        # It is easier to parse without separators. So, drop them
+        guid_str = guid_str.replace('-', '')
+
+        if len(guid_str) != 32:
+            print("Size not 32", guid_str, len(guid_str))
+            raise ValueError('Invalid GUID hex size')
+
+        time_low = 0
+        time_mid = 0
+        time_high = 0
+        nodes = []
+
+        for i in reversed(range(16, 32, 2)):
+            h = guid_str[i:i + 2]
+            value = int(h, 16)
+            nodes.insert(0, value)
+
+        time_high = int(guid_str[12:16], 16)
+        time_mid = int(guid_str[8:12], 16)
+        time_low = int(guid_str[0:8], 16)
+
+        return cls(time_low, time_mid, time_high, nodes)
+
+    def __str__(self):
+        """Output a GUID value on its default string representation"""
+
+        clock = self.nodes[0] << 8 | self.nodes[1]
+
+        node = 0
+        for i in range(2, len(self.nodes)):
+            node = node << 8 | self.nodes[i]
+
+        s = f"{self.time_low:08x}-{self.time_mid:04x}-"
+        s += f"{self.time_high:04x}-{clock:04x}-{node:012x}"
+        return s
+
+    def to_bytes(self):
+        """Output a GUID value in bytes"""
+
+        data = bytearray()
+
+        util.data_add(data, self.time_low, 4)
+        util.data_add(data, self.time_mid, 2)
+        util.data_add(data, self.time_high, 2)
+        data.extend(bytearray(self.nodes))
+
+        return data
+
+class qmp:
+    """
+    Opens a connection and send/receive QMP commands.
+    """
+
+    def send_cmd(self, command, args=None, may_open=False, return_error=True):
+        """Send a command to QMP, optinally opening a connection"""
+
+        if may_open:
+            self._connect()
+        elif not self.connected:
+            return False
+
+        msg = { 'execute': command }
+        if args:
+            msg['arguments'] = args
+
+        try:
+            obj = self.qmp_monitor.cmd_obj(msg)
+        # Can we use some other exception class here?
+        except Exception as e:                         # pylint: disable=W0718
+            print(f"Command: {command}")
+            print(f"Failed to inject error: {e}.")
+            return None
+
+        if "return" in obj:
+            if isinstance(obj.get("return"), dict):
+                if obj["return"]:
+                    return obj["return"]
+                return "OK"
+
+            return obj["return"]
+
+        if isinstance(obj.get("error"), dict):
+            error = obj["error"]
+            if return_error:
+                print(f"Command: {msg}")
+                print(f'{error["class"]}: {error["desc"]}')
+        else:
+            print(json.dumps(obj))
+
+        return None
+
+    def _close(self):
+        """Shutdown and close the socket, if opened"""
+        if not self.connected:
+            return
+
+        self.qmp_monitor.close()
+        self.connected = False
+
+    def _connect(self):
+        """Connect to a QMP TCP/IP port, if not connected yet"""
+
+        if self.connected:
+            return True
+
+        try:
+            self.qmp_monitor.connect(negotiate=True)
+        except ConnectionError:
+            sys.exit(f"Can't connect to QMP host {self.host}:{self.port}")
+
+        self.connected = True
+
+        return True
+
+    BLOCK_STATUS_BITS = {
+        "uncorrectable":            util.bit(0),
+        "correctable":              util.bit(1),
+        "multi-uncorrectable":      util.bit(2),
+        "multi-correctable":        util.bit(3),
+    }
+
+    ERROR_SEVERITY = {
+        "recoverable":  0,
+        "fatal":        1,
+        "corrected":    2,
+        "none":         3,
+    }
+
+    VALIDATION_BITS = {
+        "fru-id":       util.bit(0),
+        "fru-text":     util.bit(1),
+        "timestamp":    util.bit(2),
+    }
+
+    GEDB_FLAGS_BITS = {
+        "recovered":    util.bit(0),
+        "prev-error":   util.bit(1),
+        "simulated":    util.bit(2),
+    }
+
+    GENERIC_DATA_SIZE = 72
+
+    def argparse(parser):
+        """Prepare a parser group to query generic error data"""
+
+        block_status_bits = ",".join(qmp.BLOCK_STATUS_BITS.keys())
+        error_severity_enum = ",".join(qmp.ERROR_SEVERITY.keys())
+        validation_bits = ",".join(qmp.VALIDATION_BITS.keys())
+        gedb_flags_bits = ",".join(qmp.GEDB_FLAGS_BITS.keys())
+
+        g_gen = parser.add_argument_group("Generic Error Data")  # pylint: disable=E1101
+        g_gen.add_argument("--block-status",
+                           help=f"block status bits: {block_status_bits}")
+        g_gen.add_argument("--raw-data", nargs="+",
+                        help="Raw data inside the Error Status Block")
+        g_gen.add_argument("--error-severity", "--severity",
+                           help=f"error severity: {error_severity_enum}")
+        g_gen.add_argument("--gen-err-valid-bits",
+                           "--generic-error-validation-bits",
+                           help=f"validation bits: {validation_bits}")
+        g_gen.add_argument("--fru-id", type=guid.UUID,
+                           help="GUID representing a physical device")
+        g_gen.add_argument("--fru-text",
+                           help="ASCII string identifying the FRU hardware")
+        g_gen.add_argument("--timestamp", type=util.time,
+                           help="Time when the error info was collected")
+        g_gen.add_argument("--precise", "--precise-timestamp",
+                           action='store_true',
+                           help="Marks the timestamp as precise if --timestamp is used")
+        g_gen.add_argument("--gedb-flags",
+                           help=f"General Error Data Block flags: {gedb_flags_bits}")
+
+    def set_args(self, args):
+        """Set the arguments optionally defined via self.argparse()"""
+
+        if args.block_status:
+            self.block_status = util.get_choice(name="block-status",
+                                                value=args.block_status,
+                                                choices=self.BLOCK_STATUS_BITS,
+                                                bitmask=False)
+        if args.raw_data:
+            self.raw_data = util.get_array("raw-data", args.raw_data,
+                                           max_val=255)
+            print(self.raw_data)
+
+        if args.error_severity:
+            self.error_severity = util.get_choice(name="error-severity",
+                                                  value=args.error_severity,
+                                                  choices=self.ERROR_SEVERITY,
+                                                  bitmask=False)
+
+        if args.fru_id:
+            self.fru_id = args.fru_id.to_bytes()
+            if not args.gen_err_valid_bits:
+                self.validation_bits |= self.VALIDATION_BITS["fru-id"]
+
+        if args.fru_text:
+            text = bytearray(args.fru_text.encode('ascii'))
+            if len(text) > 20:
+                sys.exit("FRU text is too big to fit")
+
+            self.fru_text = text
+            if not args.gen_err_valid_bits:
+                self.validation_bits |= self.VALIDATION_BITS["fru-text"]
+
+        if args.timestamp:
+            time = args.timestamp
+            century = int(time.year / 100)
+
+            bcd = bytearray()
+            util.data_add(bcd, (time.second // 10) << 4 | (time.second % 10), 1)
+            util.data_add(bcd, (time.minute // 10) << 4 | (time.minute % 10), 1)
+            util.data_add(bcd, (time.hour // 10) << 4 | (time.hour % 10), 1)
+
+            if args.precise:
+                util.data_add(bcd, 1, 1)
+            else:
+                util.data_add(bcd, 0, 1)
+
+            util.data_add(bcd, (time.day // 10) << 4 | (time.day % 10), 1)
+            util.data_add(bcd, (time.month // 10) << 4 | (time.month % 10), 1)
+            util.data_add(bcd,
+                          ((time.year % 100) // 10) << 4 | (time.year % 10), 1)
+            util.data_add(bcd, ((century % 100) // 10) << 4 | (century % 10), 1)
+
+            self.timestamp = bcd
+            if not args.gen_err_valid_bits:
+                self.validation_bits |= self.VALIDATION_BITS["timestamp"]
+
+        if args.gen_err_valid_bits:
+            self.validation_bits = util.get_choice(name="validation",
+                                                   value=args.gen_err_valid_bits,
+                                                   choices=self.VALIDATION_BITS)
+
+    def __init__(self, host, port, debug=False):
+        """Initialize variables used by the QMP send logic"""
+
+        self.connected = False
+        self.host = host
+        self.port = port
+        self.debug = debug
+
+        # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Status Block
+        self.block_status = self.BLOCK_STATUS_BITS["uncorrectable"]
+        self.raw_data = []
+        self.error_severity = self.ERROR_SEVERITY["recoverable"]
+
+        # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Data Entry
+        self.validation_bits = 0
+        self.flags = 0
+        self.fru_id = bytearray(16)
+        self.fru_text = bytearray(20)
+        self.timestamp = bytearray(8)
+
+        self.qmp_monitor = QEMUMonitorProtocol(address=(self.host, self.port))
+
+    #
+    # Socket QMP send command
+    #
+    def send_cper_raw(self, cper_data):
+        """Send a raw CPER data to QEMU though QMP TCP socket"""
+
+        data = b64encode(bytes(cper_data)).decode('ascii')
+
+        cmd_arg = {
+            'cper': data
+        }
+
+        self._connect()
+
+        if self.send_cmd("inject-ghes-error", cmd_arg):
+            print("Error injected.")
+
+    def send_cper(self, notif_type, payload):
+        """Send commands to QEMU though QMP TCP socket"""
+
+        # Fill CPER record header
+
+        # NOTE: bits 4 to 13 of block status contain the number of
+        # data entries in the data section. This is currently unsupported.
+
+        cper_length = len(payload)
+        data_length = cper_length + len(self.raw_data) + self.GENERIC_DATA_SIZE
+
+        #  Generic Error Data Entry
+        gede = bytearray()
+
+        gede.extend(notif_type.to_bytes())
+        util.data_add(gede, self.error_severity, 4)
+        util.data_add(gede, 0x300, 2)
+        util.data_add(gede, self.validation_bits, 1)
+        util.data_add(gede, self.flags, 1)
+        util.data_add(gede, cper_length, 4)
+        gede.extend(self.fru_id)
+        gede.extend(self.fru_text)
+        gede.extend(self.timestamp)
+
+        # Generic Error Status Block
+        gebs = bytearray()
+
+        if self.raw_data:
+            raw_data_offset = len(gebs)
+        else:
+            raw_data_offset = 0
+
+        util.data_add(gebs, self.block_status, 4)
+        util.data_add(gebs, raw_data_offset, 4)
+        util.data_add(gebs, len(self.raw_data), 4)
+        util.data_add(gebs, data_length, 4)
+        util.data_add(gebs, self.error_severity, 4)
+
+        cper_data = bytearray()
+        cper_data.extend(gebs)
+        cper_data.extend(gede)
+        cper_data.extend(bytearray(self.raw_data))
+        cper_data.extend(bytearray(payload))
+
+        if self.debug:
+            print(f"GUID: {notif_type}")
+
+            util.dump_bytearray("Generic Error Status Block", gebs)
+            util.dump_bytearray("Generic Error Data Entry", gede)
+
+            if self.raw_data:
+                util.dump_bytearray("Raw data", bytearray(self.raw_data))
+
+            util.dump_bytearray("Payload", payload)
+
+        self.send_cper_raw(cper_data)
+
+
+    def search_qom(self, path, prop, regex):
+        """
+        Return a list of devices that match path array like:
+
+            /machine/unattached/device
+            /machine/peripheral-anon/device
+            ...
+        """
+
+        found = []
+
+        i = 0
+        while 1:
+            dev = f"{path}[{i}]"
+            args = {
+                'path': dev,
+                'property': prop
+            }
+            ret = self.send_cmd("qom-get", args, may_open=True, return_error=False)
+            if not ret:
+                break
+
+            if isinstance(ret, str):
+                if regex.search(ret):
+                    found.append(dev)
+
+            i += 1
+            if i > 10000:
+                print("Too many objects returned by qom-get!")
+                break
+
+        return found
+
+class cper_guid:
+    """
+    Contains CPER GUID, as per:
+    https://uefi.org/specs/UEFI/2.10/Apx_N_Common_Platform_Error_Record.html
+    """
+
+    CPER_PROC_GENERIC =  guid(0x9876CCAD, 0x47B4, 0x4bdb,
+                              [0xB6, 0x5E, 0x16, 0xF1,
+                               0x93, 0xC4, 0xF3, 0xDB])
+
+    CPER_PROC_X86 = guid(0xDC3EA0B0, 0xA144, 0x4797,
+                         [0xB9, 0x5B, 0x53, 0xFA,
+                          0x24, 0x2B, 0x6E, 0x1D])
+
+    CPER_PROC_ITANIUM = guid(0xe429faf1, 0x3cb7, 0x11d4,
+                             [0xbc, 0xa7, 0x00, 0x80,
+                              0xc7, 0x3c, 0x88, 0x81])
+
+    CPER_PROC_ARM = guid(0xE19E3D16, 0xBC11, 0x11E4,
+                         [0x9C, 0xAA, 0xC2, 0x05,
+                          0x1D, 0x5D, 0x46, 0xB0])
+
+    CPER_PLATFORM_MEM = guid(0xA5BC1114, 0x6F64, 0x4EDE,
+                             [0xB8, 0x63, 0x3E, 0x83,
+                              0xED, 0x7C, 0x83, 0xB1])
+
+    CPER_PLATFORM_MEM2 = guid(0x61EC04FC, 0x48E6, 0xD813,
+                              [0x25, 0xC9, 0x8D, 0xAA,
+                               0x44, 0x75, 0x0B, 0x12])
+
+    CPER_PCIE = guid(0xD995E954, 0xBBC1, 0x430F,
+                     [0xAD, 0x91, 0xB4, 0x4D,
+                      0xCB, 0x3C, 0x6F, 0x35])
+
+    CPER_PCI_BUS = guid(0xC5753963, 0x3B84, 0x4095,
+                        [0xBF, 0x78, 0xED, 0xDA,
+                         0xD3, 0xF9, 0xC9, 0xDD])
+
+    CPER_PCI_DEV = guid(0xEB5E4685, 0xCA66, 0x4769,
+                        [0xB6, 0xA2, 0x26, 0x06,
+                         0x8B, 0x00, 0x13, 0x26])
+
+    CPER_FW_ERROR = guid(0x81212A96, 0x09ED, 0x4996,
+                         [0x94, 0x71, 0x8D, 0x72,
+                          0x9C, 0x8E, 0x69, 0xED])
+
+    CPER_DMA_GENERIC = guid(0x5B51FEF7, 0xC79D, 0x4434,
+                            [0x8F, 0x1B, 0xAA, 0x62,
+                             0xDE, 0x3E, 0x2C, 0x64])
+
+    CPER_DMA_VT = guid(0x71761D37, 0x32B2, 0x45cd,
+                       [0xA7, 0xD0, 0xB0, 0xFE,
+                        0xDD, 0x93, 0xE8, 0xCF])
+
+    CPER_DMA_IOMMU = guid(0x036F84E1, 0x7F37, 0x428c,
+                         [0xA7, 0x9E, 0x57, 0x5F,
+                          0xDF, 0xAA, 0x84, 0xEC])
+
+    CPER_CCIX_PER = guid(0x91335EF6, 0xEBFB, 0x4478,
+                         [0xA6, 0xA6, 0x88, 0xB7,
+                          0x28, 0xCF, 0x75, 0xD7])
+
+    CPER_CXL_PROT_ERR = guid(0x80B9EFB4, 0x52B5, 0x4DE3,
+                             [0xA7, 0x77, 0x68, 0x78,
+                              0x4B, 0x77, 0x10, 0x48])
-- 
2.48.1
Re: [PATCH 11/11] scripts/ghes_inject: add a script to generate GHES error inject
Posted by Jonathan Cameron via 11 months ago
On Wed, 22 Jan 2025 16:46:28 +0100
Mauro Carvalho Chehab <mchehab+huawei@kernel.org> wrote:

> Using the QMP GHESv2 API requires preparing a raw data array
> containing a CPER record.
> 
> Add a helper script with subcommands to prepare such data.
> 
> Currently, only ARM Processor error CPER record is supported.
> 
> Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
My Python is poor at best, so take that into account!

Some usage examples might be good to have as well.
Either in explicit docs or the patch description.

Jonathan


> ---
>  MAINTAINERS                    |   3 +
>  scripts/arm_processor_error.py | 377 ++++++++++++++++++
>  scripts/ghes_inject.py         |  51 +++
>  scripts/qmp_helper.py          | 702 +++++++++++++++++++++++++++++++++
>  4 files changed, 1133 insertions(+)
>  create mode 100644 scripts/arm_processor_error.py
>  create mode 100755 scripts/ghes_inject.py
>  create mode 100644 scripts/qmp_helper.py
> 

> diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> new file mode 100644
> index 000000000000..357ebc6e8359
> --- /dev/null
> +++ b/scripts/qmp_helper.py



> +    def send_cper(self, notif_type, payload):
> +        """Send commands to QEMU though QMP TCP socket"""
> +
> +        # Fill CPER record header
> +
> +        # NOTE: bits 4 to 13 of block status contain the number of
> +        # data entries in the data section. This is currently unsupported.

Not controllable, so always set to 0 or 1?  Or not set?

...


> +        self.send_cper_raw(cper_data)
> +

Trivial but maybe can improve consistency on spacing.  1 or 2 lines
between functions. I don't care which.

> +
> +    def search_qom(self, path, prop, regex):

> +class cper_guid:
> +    """
> +    Contains CPER GUID, as per:
> +    https://uefi.org/specs/UEFI/2.10/Apx_N_Common_Platform_Error_Record.html
> +    """

> +
> +    CPER_PROC_X86 = guid(0xDC3EA0B0, 0xA144, 0x4797,

Maybe follow the spec naming as IA32_X64?

> +                         [0xB9, 0x5B, 0x53, 0xFA,
> +                          0x24, 0x2B, 0x6E, 0x1D])
> +
> +    CPER_PROC_ITANIUM = guid(0xe429faf1, 0x3cb7, 0x11d4,
> +                             [0xbc, 0xa7, 0x00, 0x80,

To stop people falling down a hole, maybe call out that this
is not the format in the spec which is weird for this one case.

> +                              0xc7, 0x3c, 0x88, 0x81])
> +

> +

> +
> +    CPER_PLATFORM_MEM2 = guid(0x61EC04FC, 0x48E6, 0xD813,
> +                              [0x25, 0xC9, 0x8D, 0xAA,
> +                               0x44, 0x75, 0x0B, 0x12])
Huh. they missed this one in the big spec table but is
there in N.2.6



> +    CPER_PCI_DEV = guid(0xEB5E4685, 0xCA66, 0x4769,
CPER_PCI_COMPONENT would match N.2.9 naming.
If I recall PCI terminology right, component covers a bunch
of things that Device doesn't.


> +                        [0xB6, 0xA2, 0x26, 0x06,
> +                         0x8B, 0x00, 0x13, 0x26])
> +
> +    CPER_FW_ERROR = guid(0x81212A96, 0x09ED, 0x4996,
Another one oddly missing from the big table but not the broken
out sections.  Not our problem but this chunk of the
spec could do with tidying up!

> +                         [0x94, 0x71, 0x8D, 0x72,
> +                          0x9C, 0x8E, 0x69, 0xED])
> +
> +    CPER_DMA_GENERIC = guid(0x5B51FEF7, 0xC79D, 0x4434,
CPER_DMAR
Nothing to do with DMA in general.  All about x86 IOMMUs I think.

> +                            [0x8F, 0x1B, 0xAA, 0x62,
> +                             0xDE, 0x3E, 0x2C, 0x64])
> +

> +
> +    CPER_CXL_PROT_ERR = guid(0x80B9EFB4, 0x52B5, 0x4DE3,
> +                             [0xA7, 0x77, 0x68, 0x78,
> +                              0x4B, 0x77, 0x10, 0x48])

Maybe add the one for FRU Memory poison from the new 2.11 UEFI spec.
This will constantly need updating with new specs so no problem
if you'd rather stick to 2.10 only for now.