[PATCH 05/14] scripts: Add script to convert relaxNG to protobuf

Rayhan Faizel posted 14 patches 3 months ago
[PATCH 05/14] scripts: Add script to convert relaxNG to protobuf
Posted by Rayhan Faizel 3 months ago
This script converts relaxNG schemas to an equivalent protobuf definition
file. The script captures the general structure of the XML schema and tries
to guess the attribute datatypes. 

The protobuf definitions give the fuzzers awareness of the XML schema.
The protobuf files will be used by the fuzzers to mutate protobuf data and
serialize them to XML.

Signed-off-by: Rayhan Faizel <rayhan.faizel@gmail.com>
---
 build-aux/syntax-check.mk   |   1 +
 scripts/meson.build         |   1 +
 scripts/relaxng-to-proto.py | 505 ++++++++++++++++++++++++++++++++++++
 3 files changed, 507 insertions(+)
 create mode 100644 scripts/relaxng-to-proto.py

diff --git a/build-aux/syntax-check.mk b/build-aux/syntax-check.mk
index 0759372b2b..a60e4a8082 100644
--- a/build-aux/syntax-check.mk
+++ b/build-aux/syntax-check.mk
@@ -844,6 +844,7 @@ http_sites += www.inkscape.org
 http_sites += www.innotek.de
 http_sites += www.w3.org
 http_sites += xmlns
+http_sites += relaxng.org
 
 # Links in licenses
 http_sites += scripts.sil.org
diff --git a/scripts/meson.build b/scripts/meson.build
index 2798e302ab..7249346e45 100644
--- a/scripts/meson.build
+++ b/scripts/meson.build
@@ -32,6 +32,7 @@ scripts = [
   'mock-noinline.py',
   'prohibit-duplicate-header.py',
   'qemu-replies-tool.py',
+  'relaxng-to-proto.py',
 ]
 
 foreach name : scripts
diff --git a/scripts/relaxng-to-proto.py b/scripts/relaxng-to-proto.py
new file mode 100644
index 0000000000..f13d6f7e40
--- /dev/null
+++ b/scripts/relaxng-to-proto.py
@@ -0,0 +1,505 @@
+#!/usr/bin/env python3
+
+import re
+import sys
+import xml.etree.ElementTree as ET
+import argparse
+
+# Track XML tree objects of all <define> tags
+define_table = {}
+
+# Store parsed tree of <define> tag
+define_trees = {}
+
+relaxng_ns = "{http://relaxng.org/ns/structure/1.0}"
+
+integer_refs = ["positiveInteger", "unsignedInt", "uint8", "uint16", "uint24", "uint32", "hexuint"]
+integer_datatypes = ["positiveInteger", "unsignedInt", "int", "long", "unsignedLong", "integer"]
+
+#Override attribute based on ref
+custom_ref_table = {
+                    "virYesNo": {"type": "bool"},
+                    "virOnOff": {"type": "Switch"},
+                    "ipAddr": {"type": "IPAddr"},
+                    "ipv4Addr": {"type": "IPAddr"},
+                    "ipv6Addr": {"type": "IPAddr"},
+                    "diskTargetDev": {"type": "TargetDev"},
+                    "UUID": {"type": "DummyUUID"},
+                    "usbIdDefault": {"values": ["-1",]},
+                    "usbClass": {"type": "uint32"},
+                    "usbId": {"type": "uint32"},
+                    "usbVersion": {"type": "uint32"},
+                    "usbAddr": {"type": "uint32"},
+                    "usbPort": {"type": "uint32"},
+                    "virtioserialPort": {"type": "uint32"},
+                    "timeDelta": {"type": "uint32"},
+                    "absFilePath": {"type": "DummyPath"},
+                    "filePath": {"type": "DummyPath"},
+                    "absDirPath": {"type": "DummyPath"},
+                    "dirPath": {"type": "DummyPath"},
+                    "cpuset": {"type": "CPUSet"},
+                    "pciSlot": {"type": "uint32"},
+                    "pciFunc": {"type": "uint32"},
+                    "ccidSlot": {"type": "uint32"},
+                    "ccwCssidRange": {"type": "uint32"},
+                    "ccwSsidRange": {"type": "uint32"},
+                    "ccwDevnoRange": {"type": "uint32"},
+                    "driveController": {"type": "uint32"},
+                    "driveBus": {"type": "uint32"},
+                    "driveSCSITarget": {"type": "uint32"},
+                    "driveUnit": {"type": "uint32"},
+                    "irq": {"type": "uint32"},
+                    "iobase": {"type": "uint32"},
+                    "uniMacAddr": {"type": "MacAddr"},
+                   }
+
+net_model_names = ["virtio", "virtio-transitional", "virtio-non-transitional", "e1000", "e1000e", "igb",
+                   "rtl8139", "netfront", "usb-net", "spapr-vlan", "lan9118", "scm91c111", "vlance", "vmxnet",
+                   "vmxnet2", "vmxnet3", "Am79C970A", "Am79C973", "82540EM", "82545EM", "82543GC"]
+
+# Override attribute based on paths
+attr_path_table = {
+                    "domain.devices.interface.model.type": {"values": net_model_names},
+                  }
+
+# Tag paths end with a dot while attributes don't.
+xml_modify_path = {
+                    "domain.devices.smartcard.certificate.": {"repeated": True}
+                  }
+
+def tree_add_tag(tree, element_name, set_repeat):
+    if "tags" not in tree:
+        tree["tags"] = {}
+
+    if element_name not in tree["tags"]:
+        tree["tags"][element_name] = {}
+
+    if set_repeat:
+        tree["tags"][element_name]["repeated"] = True
+
+def tree_add_attribute(tree, child, attrib_name, path):
+    if "attributes" not in tree:
+        tree["attributes"] = {}
+
+    if attrib_name not in tree["attributes"]:
+        tree["attributes"][attrib_name] = {}
+
+    if "type_list" not in tree["attributes"][attrib_name]:
+        tree["attributes"][attrib_name]["type_list"] = []
+
+    if "value_list" not in tree["attributes"][attrib_name]:
+        tree["attributes"][attrib_name]["value_list"] = []
+
+    parse_attribute(child, attrib_name, tree["attributes"][attrib_name], path + attrib_name)
+
+def parse_datatype(root):
+    datatype = root.attrib["type"]
+    if datatype in integer_datatypes:
+        return "uint32"
+    else:
+        return "DummyString"
+
+# Parse <ref>, which may point to either a primitive data type, a type
+# defined in custom_ref_table or simply an XML tree pointer.
+def parse_ref_node(ref_node):
+    ref_tree = define_table[ref_node.attrib["name"]]
+    ref_name = ref_node.attrib["name"]
+
+    if ref_name in integer_refs:
+        return {"type": "uint32"}
+    elif ref_name in custom_ref_table:
+        ref_type = custom_ref_table[ref_name]
+        return ref_type
+    else:
+        return {"tree": ref_tree}
+
+def add_to_attr_list(l, new_val):
+    if new_val not in l:
+        l.append(new_val)
+
+# Map custom ref table entries to appropriate values in value_list
+# and type_list of an attribute tree.
+def add_ref_type_to_attr_lists(attr_type, value_list, type_list):
+    if "types" in attr_type:
+        for type_name in attr_type["types"]:
+            add_to_attr_list(type_list, type_name)
+    elif "type" in attr_type:
+        add_to_attr_list(type_list, attr_type["type"])
+
+    if "values" in attr_type:
+        for value in attr_type["values"]:
+            add_to_attr_list(value_list, value)
+
+# Parse <choice> inside <attribute>.
+#
+# <choice> may contain one or more <data>, <value> or <ref> tags
+def parse_attribute_choices(attribute_node, value_list, type_list):
+    for value_node in attribute_node:
+        if (value_node.tag == relaxng_ns + "value"):
+            value = value_node.text
+            add_to_attr_list(value_list, value)
+        elif (value_node.tag == relaxng_ns + "ref"):
+            ref_parse = parse_ref_node(value_node)
+
+            if "tree" not in ref_parse:
+                add_ref_type_to_attr_lists(ref_parse, value_list, type_list)
+            else:
+                ref_tree = ref_parse["tree"]
+                parse_attribute_choices(ref_tree, value_list, type_list)
+        elif (value_node.tag == relaxng_ns + "data"):
+            datatype = parse_datatype(value_node)
+            add_to_attr_list(type_list, datatype)
+        elif (value_node.tag == relaxng_ns + "text"):
+            add_to_attr_list(type_list, "DummyString")
+        else:
+            parse_attribute_choices(value_node, value_list, type_list)
+
+# Parse <attribute> and generate an attribute tree
+#
+# An attribute tree consists of:
+# 1. 'type_list': List of data types (Eg: uint32, string, etc.)
+# 2. 'value_list': List of enum values.
+#
+# type_list and value_list can be extended further throughout the
+# parsing of the XML tree.
+def parse_attribute(root, attribute_name, attribute_tree, path):
+    type_list = attribute_tree["type_list"]
+    value_list = attribute_tree["value_list"]
+
+    if path in attr_path_table:
+        add_ref_type_to_attr_lists(attr_path_table[path], value_list, type_list)
+        return
+
+    if (len(root) == 0):
+        # If there is nothing in <attribute>, assuming string.
+        add_to_attr_list(type_list, "DummyString")
+        return
+
+    attribute_node = root[0]
+
+    if attribute_node.tag == relaxng_ns + "value":
+        # Single <value> corresponds to mono-valued enum
+        value = attribute_node.text
+        add_to_attr_list(value_list, value)
+    elif attribute_node.tag == relaxng_ns + "choice":
+        # Parse <choice>
+        parse_attribute_choices(attribute_node, value_list, type_list)
+    elif attribute_node.tag == relaxng_ns + "data":
+        # Primitive datatypes can be mapped to protobuf types directly
+        data_type = parse_datatype(attribute_node)
+        add_to_attr_list(type_list, data_type)
+    elif attribute_node.tag == relaxng_ns + "ref":
+        ref_name = attribute_node.attrib["name"]
+        ref_parse = parse_ref_node(attribute_node)
+        if "tree" not in ref_parse:
+            add_ref_type_to_attr_lists(ref_parse, value_list, type_list)
+        else:
+            # Recurse into ref
+            parse_attribute(define_table[ref_name], attribute_name, attribute_tree, path)
+            return
+    elif attribute_node.tag == relaxng_ns + "text":
+        # <text> is simply a generic string
+        add_to_attr_list(type_list, "DummyString")
+    else:
+        # We should never reach here
+        raise ValueError(f"Attribute {attribute_name} has unknown datatype")
+
+# Store XML text node data
+def initialize_text_tree(tree):
+    if "text" not in tree:
+        tree["text"] = {"value_list": [], "type_list": []}
+
+# Parse <define> and store data in intermediate tree.
+#
+# An intermediate tree will consist of
+# 1. 'tags': List of nested tag trees, which may contain other tags or attributes.
+# 2. 'attributes': List of attribute trees
+# 3. 'text': Similar in structure to attribute tree, representing an XML text node.
+def parse_define(root, tree, ref_traverse, path="", set_repeat=False):
+    if path in xml_modify_path:
+        # TODO: Allow overriding more stuff when required
+        xml_modify_path_entry = xml_modify_path[path]
+        if "repeated" in xml_modify_path_entry:
+            tree["repeated"] = xml_modify_path_entry["repeated"]
+
+    for child in root:
+        tag = child.tag
+        attrib = child.attrib
+
+        # Handle <element> tags which will be represented as T_ fields in
+        # the protobuf.
+        if tag == relaxng_ns + "element":
+            if "name" not in attrib:
+                continue
+
+            element_name = attrib["name"]
+
+            tree_add_tag(tree, element_name, set_repeat)
+
+            parse_define(child, tree["tags"][element_name], ref_traverse, path + element_name + ".")
+
+        # Handle <attribute> tags which will be represented as A_ fields in
+        # the protobuf.
+        elif tag == relaxng_ns + "attribute":
+            attrib_name = attrib["name"]
+
+            tree_add_attribute(tree, child, attrib_name, path)
+
+        # <ref> points to another <define> which is recursively traversed.
+        elif tag == relaxng_ns + "ref":
+            ref_name = attrib["name"]
+
+            # If ref encapsulates datatype, generate V_ field instead of traversing inside
+            ref_parse = parse_ref_node(child)
+            if ("tree" not in ref_parse):
+                initialize_text_tree(tree)
+                add_ref_type_to_attr_lists(ref_parse, tree["text"]["value_list"], tree["text"]["type_list"])
+                continue
+
+            # Handle infinitely recursive refs
+            if (define_table[ref_name] in ref_traverse):
+                pass
+            else:
+                parse_define(define_table[ref_name], tree, ref_traverse + [define_table[ref_name]], path, set_repeat)
+
+        # If <oneOrMore> or <zeroOrMore> is used,
+        # immediate elements under it will have 'repeated' specifier in the
+        # final protobuf.
+        elif tag == relaxng_ns + "oneOrMore" or tag == relaxng_ns + "zeroOrMore":
+            parse_define(child, tree, ref_traverse, path, True)
+
+        # <value>, <data> or <text> residing outside of <attribute> are
+        # XML text nodes, represented by V_ fields.
+        elif tag == relaxng_ns + "value":
+            initialize_text_tree(tree)
+            add_to_attr_list(tree["text"]["value_list"], child.text)
+        elif tag == relaxng_ns + "data":
+            initialize_text_tree(tree)
+            add_to_attr_list(tree["text"]["type_list"], parse_datatype(child))
+        elif tag == relaxng_ns + "text":
+            initialize_text_tree(tree)
+            add_to_attr_list(tree["text"]["type_list"], "DummyString")
+        else:
+            parse_define(child, tree, ref_traverse, path, set_repeat)
+
+# Find all <define> tags and store them to resolve <ref> tags later
+#
+# Also parse all <include> tags in order to add more <define> tags to the
+# table
+def get_defines(schema_path):
+    schema_tree = ET.parse(schema_path)
+    root = schema_tree.getroot()
+
+    for child in root:
+        tag = child.tag
+        attrib = child.attrib
+
+        if tag == relaxng_ns + "start":
+            define_table["rng_entrypoint"] = child
+        if tag == relaxng_ns + "define":
+            define_name = attrib["name"]
+            define_table[define_name] = child
+        elif tag == relaxng_ns + "include":
+            include_href = attrib["href"]
+            get_defines(f"../src/conf/schemas/{include_href}")
+
+def padding(text, level):
+    return " " * level * 4 + text
+
+# Generate enum protobuf
+def enum_to_proto(tree, level, scope):
+    proto = ""
+    enum_index = 0
+    restricted_words = ["unix", "linux"]
+    for value in tree["values"]:
+        formatted_value = re.sub("[^a-zA-Z0-9_]", "_", value)
+
+        if re.match("^[0-9]", formatted_value):
+            formatted_value = "_" + formatted_value
+
+        if formatted_value in restricted_words:
+            formatted_value = "const_" + formatted_value
+
+        while formatted_value in scope:
+            formatted_value = "_" + formatted_value
+
+        proto += padding(f"{formatted_value} = {enum_index}", level)
+
+        if formatted_value != value:
+            proto += f" [(real_value) = '{value}'];\n"
+        else:
+            proto += ";\n"
+
+        scope.add(formatted_value)
+        enum_index += 1
+
+    return proto
+
+# Generate oneof protobuf containing multiple protobuf fields.
+def oneof_to_proto(tree, attribute, protobuf_index, level, proto_opt, scope):
+    proto = ""
+    if "enum" in tree["types"]:
+        proto += padding(f"enum {attribute}Enum {{\n", level)
+        proto += enum_to_proto(tree["types"]["enum"], level + 1, scope)
+        proto += padding("}\n", level)
+
+    optnum = 0
+    proto += padding(f"oneof {attribute}Option {{\n", level)
+    for datatype in tree["types"]:
+        if datatype == "enum":
+            datatype = f"{attribute}Enum"
+        proto += padding(f"{datatype} A_OPT{str(optnum).zfill(2)}_{attribute} = {protobuf_index}{proto_opt};\n", level + 1)
+        protobuf_index += 1
+        optnum += 1
+
+    proto += padding(f"}}\n", level)
+
+    return (proto, protobuf_index - 1)
+
+# Given an attribute tree with type_list and value_list,
+# determine how the protobuf field must be generated, i.e
+# what field type it is and if it can take on multiple types.
+def generate_attribute_type(attribute_tree):
+    result = {}
+
+    type_list = attribute_tree["type_list"]
+    value_list = attribute_tree["value_list"]
+    # Number of data types possible for an attribute
+    # (enum values count as an additional type)
+    type_count = len(type_list) + (1 if len(value_list) > 0 else 0)
+
+    if type_count == 1:
+        if len(type_list) == 1:
+            result["type"] = type_list[0]
+        elif len(value_list) > 0:
+            result["type"] = "enum"
+            result["values"] = value_list
+    else:
+        # If there are more than two data types for the attribute,
+        # it should be oneof in the protobuf.
+        result["type"] = "oneof"
+        result["types"] = {}
+        for datatype in type_list:
+            result["types"][datatype] = {"type": datatype}
+
+        if (len(value_list) > 0):
+            result["types"]["enum"] = {"type": "enum", "values": value_list}
+
+    return result
+
+# Convert intermediate tree to protobuf
+def define_tree_to_proto(tree, level):
+    tags = tree.get("tags", {})
+    attributes = tree.get("attributes", {})
+    content_type = tree.get("content_type", None)
+
+    # Due to how protobuf scoping works, we can't have the same enum idenitifers
+    # under the same message. We need to keep track of the scope ourselves.
+    current_scope = set()
+
+    proto = ""
+    protobuf_index = 1
+
+    for attribute in attributes:
+        renamed_attr = attribute
+        proto_opt = ""
+        if re.search("[^a-zA-Z0-9_]", attribute):
+            renamed_attr = re.sub("[^a-zA-Z0-9_]", "_", attribute)
+            proto_opt = f" [(real_name) = '{attribute}']"
+
+        attribute_type = generate_attribute_type(attributes[attribute])
+        datatype = attribute_type["type"]
+
+        if datatype == "oneof":
+            new_proto, new_index = oneof_to_proto(attribute_type, renamed_attr, protobuf_index, level, proto_opt, current_scope)
+            proto += new_proto
+            protobuf_index = new_index
+        elif datatype == "enum":
+            proto += padding(f"enum {renamed_attr}Enum {{\n", level)
+            proto += enum_to_proto(attribute_type, level + 1, current_scope)
+            proto += padding("}\n", level)
+            proto += padding(f"optional {renamed_attr}Enum A_{renamed_attr} = {protobuf_index}{proto_opt};\n", level)
+        else:
+            proto += padding(f"optional {datatype} A_{renamed_attr} = {protobuf_index}{proto_opt};\n", level)
+
+        protobuf_index += 1
+
+    protobuf_tag_index = 10000
+
+    if "text" in tree:
+        # Note that if both V_ and T_ fields are present,  V_ will be favoured
+        # if its presence returns true (since it's optional), otherwise T_ fields
+        # will be used.
+        text_tree = tree["text"]
+        text_type = generate_attribute_type(text_tree)
+        datatype = text_type["type"]
+
+        if datatype == "oneof":
+            print("WARN: oneof of V_ not yet supported!")
+        elif datatype == "enum":
+            proto += padding(f"enum ValueEnum {{\n", level)
+            proto += enum_to_proto(text_type, level + 1, current_scope)
+            proto += padding("}\n", level)
+            proto += padding(f"optional ValueEnum V_value = {protobuf_tag_index};\n", level)
+        else:
+            proto += padding(f"optional {datatype} V_value = {protobuf_tag_index};\n", level)
+
+        protobuf_tag_index += 1
+
+    for tag in tags:
+        renamed_tag = tag
+        proto_opt = ""
+        if re.search("[^a-zA-Z0-9_]", tag):
+            renamed_tag = re.sub("[^a-zA-Z0-9_]", "_", tag)
+            proto_opt += f" [(real_name) = '{tag}']"
+
+        proto += padding(f"message {renamed_tag}Tag {{\n", level)
+        proto += define_tree_to_proto(tags[tag], level + 1)
+        proto += padding("}\n", level)
+
+        specifier = "optional"
+        if (tags[tag].get("repeated", False)):
+            specifier = "repeated"
+
+        if level != 0:
+            proto += padding(f"{specifier} {renamed_tag}Tag T_{renamed_tag} = {protobuf_tag_index}{proto_opt};\n", level)
+
+        protobuf_tag_index += 1
+
+    return proto
+
+parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
+                                 description="RelaxNG schema to protobuf converter")
+
+parser.add_argument('rngfile', help='Specify .rng file to process')
+
+parser.add_argument('protofile', help='Specify .proto file to output')
+
+parser.add_argument('--defines', nargs='*', default=[ 'rng_entrypoint' ],
+                    help='Specify defines to be converted to equivalent protobuf messages',)
+
+args = parser.parse_args()
+
+allowed_defines = args.defines
+infile = args.rngfile
+outfile = args.protofile
+
+get_defines(infile)
+
+for define_name in allowed_defines:
+    define_trees[define_name] = {}
+    parse_define(define_table[define_name], define_trees[define_name], [])
+
+prologue = """\
+syntax = 'proto2';
+package libvirt;
+
+import 'xml_datatypes.proto';
+"""
+
+with open(outfile, "w") as out_file:
+    out_file.write(prologue)
+
+    for define_name in allowed_defines:
+        out_file.write(define_tree_to_proto(define_trees[define_name], 0))
+        out_file.write("\n")
-- 
2.34.1