Source code for networkx.readwrite.gml

"""
读取GML格式的图数据。

"GML,即图建模语言,是我们提出的便携式图文件格式。GML的关键特性包括可移植性、简单语法、可扩展性和灵活性。GML文件由层次化的键值对列表组成。图可以带有任意数据结构进行注释。通用文件格式的想法诞生于1995年的图数据会议;这一提案是多次讨论的结果。GML是Graphlet图编辑系统中的标准文件格式。它已被多个其他绘图系统采用和适应。"

GML文件使用7位ASCII编码存储,任何扩展ASCII字符(iso8859-1)以HTML字符实体形式出现。您需要考虑导出的数据如何与不同语言甚至不同Python版本交互。重新从GML导入也是一个考虑因素。

如果不指定 `stringizer` / `destringizer` ,代码能够根据GML规范要求写入 `int` / `float` / `str` / `dict` / `list` 数据。对于写入其他数据类型,以及读取非 `str` 数据,您需要显式提供 `stringizer` / `destringizer` 。

有关GML文件格式的更多文档,请参阅 `GML网站 <https://web.archive.org/web/20190207140002/http://www.fim.uni-passau.de/index.php?id=17297&L=1>`_。

Mark Newman的 `网络数据页面 <http://www-personal.umich.edu/~mejn/netdata/>`_ 上可以找到几个GML格式的示例图。
"""

import html.entities as htmlentitydefs
import re
import warnings
from ast import literal_eval
from collections import defaultdict
from enum import Enum
from io import StringIO
from typing import Any, NamedTuple

import networkx as nx
from networkx.exception import NetworkXError
from networkx.utils import open_file

__all__ = ["read_gml", "parse_gml", "generate_gml", "write_gml"]


def escape(text):
    """使用XML字符引用对字符进行转义。

对于字符串中的不可打印字符、非ASCII字符、双引号和和号,使用XML字符引用。
"""

    def fixup(m):
        ch = m.group(0)
        return "&#" + str(ord(ch)) + ";"

    text = re.sub('[^ -~]|[&"]', fixup, text)
    return text if isinstance(text, str) else str(text)


def unescape(text):
    """将XML字符引用替换为引用的字符"""

    def fixup(m):
        text = m.group(0)
        if text[1] == "#":
            # Character reference
            if text[2] == "x":
                code = int(text[3:-1], 16)
            else:
                code = int(text[2:-1])
        else:
            # Named entity
            try:
                code = htmlentitydefs.name2codepoint[text[1:-1]]
            except KeyError:
                return text  # leave unchanged
        try:
            return chr(code)
        except (ValueError, OverflowError):
            return text  # leave unchanged

    return re.sub("&(?:[0-9A-Za-z]+|#(?:[0-9]+|x[0-9A-Fa-f]+));", fixup, text)


[docs] def literal_destringizer(rep): """将Python字面量转换为其表示的值。 Parameters ---------- rep : 字符串 Python字面量。 Returns ------- value : 对象 Python字面量的值。 Raises ------ ValueError 如果 `rep` 不是Python字面量。 """ if isinstance(rep, str): orig_rep = rep try: return literal_eval(rep) except SyntaxError as err: raise ValueError(f"{orig_rep!r} is not a valid Python literal") from err else: raise ValueError(f"{rep!r} is not a string")
[docs] @open_file(0, mode="rb") @nx._dispatchable(graphs=None, returns_graph=True) def read_gml(path, label="label", destringizer=None): """从 `path` 读取 GML 格式的图。 Parameters ---------- path : 文件名或文件句柄 要读取的文件名或文件句柄。 label : 字符串, 可选 如果不是 None,解析的节点将根据 `label` 指示的节点属性重命名。默认值:'label'。 destringizer : 可调用对象, 可选 一个 `destringizer` ,用于恢复存储在 GML 中的字符串值。如果它无法将字符串转换为值,则会引发 `ValueError` 。默认值:None。 Returns ------- G : NetworkX 图 解析的图。 Raises ------ NetworkXError 如果输入无法解析。 See Also -------- write_gml, parse_gml literal_destringizer Notes ----- GML 文件使用 7 位 ASCII 编码存储,任何扩展 ASCII 字符(iso8859-1)都以 HTML 字符实体形式出现。 如果不指定 `stringizer` / `destringizer` ,代码能够根据 GML 规范要求写入 `int` / `float` / `str` / `dict` / `list` 数据。对于写入其他数据类型,以及读取非 `str` 数据,您需要显式提供 `stringizer` / `destringizer` 。 有关 GML 文件格式的更多文档,请参阅 `GML url <https://web.archive.org/web/20190207140002/http://www.fim.uni-passau.de/index.php?id=17297&L=1>`_。 有关更多详细信息,请参阅模块文档字符串 :mod:`networkx.readwrite.gml` 。 Examples -------- >>> G = nx.path_graph(4) >>> nx.write_gml(G, "test.gml") GML 值默认被解释为字符串: >>> H = nx.read_gml("test.gml") >>> H.nodes NodeView(('0', '1', '2', '3')) 当提供 `destringizer` 时,GML 值将转换为提供的类型。例如,整数节点可以按如下方式恢复: >>> J = nx.read_gml("test.gml", destringizer=int) >>> J.nodes NodeView((0, 1, 2, 3)) """ def filter_lines(lines): for line in lines: try: line = line.decode("ascii") except UnicodeDecodeError as err: raise NetworkXError("input is not ASCII-encoded") from err if not isinstance(line, str): lines = str(lines) if line and line[-1] == "\n": line = line[:-1] yield line G = parse_gml_lines(filter_lines(path), label, destringizer) return G
[docs] @nx._dispatchable(graphs=None, returns_graph=True) def parse_gml(lines, label="label", destringizer=None): """解析来自字符串或可迭代对象的GML图。 Parameters ---------- lines : 字符串或字符串的可迭代对象 GML格式的数据。 label : 字符串, 可选 如果不是None,解析的节点将根据 `label` 指示的节点属性重命名。默认值:'label'。 destringizer : 可调用对象, 可选 一个 `destringizer` ,用于恢复存储为字符串的GML值。如果它无法将字符串转换为值,则会引发 `ValueError` 。默认值:None。 Returns ------- G : NetworkX图 解析的图。 Raises ------ NetworkXError 如果输入无法解析。 See Also -------- write_gml, read_gml Notes ----- 这会将嵌套的GML属性作为字典存储在NetworkX图、节点和边属性结构中。 GML文件使用7位ASCII编码存储,扩展的ASCII字符(iso8859-1)以HTML字符实体形式出现。如果不指定 `stringizer` / `destringizer` ,代码能够按照GML规范要求写入 `int` / `float` / `str` / `dict` / `list` 数据。对于写入其他数据类型,以及读取非 `str` 数据,您需要显式提供 `stringizer` / `destringizer` 。 有关GML文件格式的更多文档,请参阅 `GML网址 <https://web.archive.org/web/20190207140002/http://www.fim.uni-passau.de/index.php?id=17297&L=1>`_。 有关更多详细信息,请参阅模块文档字符串 :mod:`networkx.readwrite.gml` 。 """ def decode_line(line): if isinstance(line, bytes): try: line.decode("ascii") except UnicodeDecodeError as err: raise NetworkXError("input is not ASCII-encoded") from err if not isinstance(line, str): line = str(line) return line def filter_lines(lines): if isinstance(lines, str): lines = decode_line(lines) lines = lines.splitlines() yield from lines else: for line in lines: line = decode_line(line) if line and line[-1] == "\n": line = line[:-1] if line.find("\n") != -1: raise NetworkXError("input line contains newline") yield line G = parse_gml_lines(filter_lines(lines), label, destringizer) return G
class Pattern(Enum): """将 `tokenize` 中每个匹配模式的索引进行编码。""" KEYS = 0 REALS = 1 INTS = 2 STRINGS = 3 DICT_START = 4 DICT_END = 5 COMMENT_WHITESPACE = 6 class Token(NamedTuple): category: Pattern value: Any line: int position: int LIST_START_VALUE = "_networkx_list_start" def parse_gml_lines(lines, label, destringizer): """将GML的 `lines` 解析成图。""" def tokenize(): patterns = [ r"[A-Za-z][0-9A-Za-z_]*\b", # keys # reals r"[+-]?(?:[0-9]*\.[0-9]+|[0-9]+\.[0-9]*|INF)(?:[Ee][+-]?[0-9]+)?", r"[+-]?[0-9]+", # ints r'".*?"', # strings r"\[", # dict start r"\]", # dict end r"#.*$|\s+", # comments and whitespaces ] tokens = re.compile("|".join(f"({pattern})" for pattern in patterns)) lineno = 0 multilines = [] # entries spread across multiple lines for line in lines: pos = 0 # deal with entries spread across multiple lines # # should we actually have to deal with escaped "s then do it here if multilines: multilines.append(line.strip()) if line[-1] == '"': # closing multiline entry # multiline entries will be joined by space. cannot # reintroduce newlines as this will break the tokenizer line = " ".join(multilines) multilines = [] else: # continued multiline entry lineno += 1 continue else: if line.count('"') == 1: # opening multiline entry if line.strip()[0] != '"' and line.strip()[-1] != '"': # since we expect something like key "value", the " should not be found at ends # otherwise tokenizer will pick up the formatting mistake. multilines = [line.rstrip()] lineno += 1 continue length = len(line) while pos < length: match = tokens.match(line, pos) if match is None: m = f"cannot tokenize {line[pos:]} at ({lineno + 1}, {pos + 1})" raise NetworkXError(m) for i in range(len(patterns)): group = match.group(i + 1) if group is not None: if i == 0: # keys value = group.rstrip() elif i == 1: # reals value = float(group) elif i == 2: # ints value = int(group) else: value = group if i != 6: # comments and whitespaces yield Token(Pattern(i), value, lineno + 1, pos + 1) pos += len(group) break lineno += 1 yield Token(None, None, lineno + 1, 1) # EOF def unexpected(curr_token, expected): category, value, lineno, pos = curr_token value = repr(value) if value is not None else "EOF" raise NetworkXError(f"expected {expected}, found {value} at ({lineno}, {pos})") def consume(curr_token, category, expected): if curr_token.category == category: return next(tokens) unexpected(curr_token, expected) def parse_kv(curr_token): dct = defaultdict(list) while curr_token.category == Pattern.KEYS: key = curr_token.value curr_token = next(tokens) category = curr_token.category if category == Pattern.REALS or category == Pattern.INTS: value = curr_token.value curr_token = next(tokens) elif category == Pattern.STRINGS: value = unescape(curr_token.value[1:-1]) if destringizer: try: value = destringizer(value) except ValueError: pass # Special handling for empty lists and tuples if value == "()": value = () if value == "[]": value = [] curr_token = next(tokens) elif category == Pattern.DICT_START: curr_token, value = parse_dict(curr_token) else: # Allow for string convertible id and label values if key in ("id", "label", "source", "target"): try: # String convert the token value value = unescape(str(curr_token.value)) if destringizer: try: value = destringizer(value) except ValueError: pass curr_token = next(tokens) except Exception: msg = ( "an int, float, string, '[' or string" + " convertible ASCII value for node id or label" ) unexpected(curr_token, msg) # Special handling for nan and infinity. Since the gml language # defines unquoted strings as keys, the numeric and string branches # are skipped and we end up in this special branch, so we need to # convert the current token value to a float for NAN and plain INF. # +/-INF are handled in the pattern for 'reals' in tokenize(). This # allows labels and values to be nan or infinity, but not keys. elif curr_token.value in {"NAN", "INF"}: value = float(curr_token.value) curr_token = next(tokens) else: # Otherwise error out unexpected(curr_token, "an int, float, string or '['") dct[key].append(value) def clean_dict_value(value): if not isinstance(value, list): return value if len(value) == 1: return value[0] if value[0] == LIST_START_VALUE: return value[1:] return value dct = {key: clean_dict_value(value) for key, value in dct.items()} return curr_token, dct def parse_dict(curr_token): # dict start curr_token = consume(curr_token, Pattern.DICT_START, "'['") # dict contents curr_token, dct = parse_kv(curr_token) # dict end curr_token = consume(curr_token, Pattern.DICT_END, "']'") return curr_token, dct def parse_graph(): curr_token, dct = parse_kv(next(tokens)) if curr_token.category is not None: # EOF unexpected(curr_token, "EOF") if "graph" not in dct: raise NetworkXError("input contains no graph") graph = dct["graph"] if isinstance(graph, list): raise NetworkXError("input contains more than one graph") return graph tokens = tokenize() graph = parse_graph() directed = graph.pop("directed", False) multigraph = graph.pop("multigraph", False) if not multigraph: G = nx.DiGraph() if directed else nx.Graph() else: G = nx.MultiDiGraph() if directed else nx.MultiGraph() graph_attr = {k: v for k, v in graph.items() if k not in ("node", "edge")} G.graph.update(graph_attr) def pop_attr(dct, category, attr, i): try: return dct.pop(attr) except KeyError as err: raise NetworkXError(f"{category} #{i} has no {attr!r} attribute") from err nodes = graph.get("node", []) mapping = {} node_labels = set() for i, node in enumerate(nodes if isinstance(nodes, list) else [nodes]): id = pop_attr(node, "node", "id", i) if id in G: raise NetworkXError(f"node id {id!r} is duplicated") if label is not None and label != "id": node_label = pop_attr(node, "node", label, i) if node_label in node_labels: raise NetworkXError(f"node label {node_label!r} is duplicated") node_labels.add(node_label) mapping[id] = node_label G.add_node(id, **node) edges = graph.get("edge", []) for i, edge in enumerate(edges if isinstance(edges, list) else [edges]): source = pop_attr(edge, "edge", "source", i) target = pop_attr(edge, "edge", "target", i) if source not in G: raise NetworkXError(f"edge #{i} has undefined source {source!r}") if target not in G: raise NetworkXError(f"edge #{i} has undefined target {target!r}") if not multigraph: if not G.has_edge(source, target): G.add_edge(source, target, **edge) else: arrow = "->" if directed else "--" msg = f"edge #{i} ({source!r}{arrow}{target!r}) is duplicated" raise nx.NetworkXError(msg) else: key = edge.pop("key", None) if key is not None and G.has_edge(source, target, key): arrow = "->" if directed else "--" msg = f"edge #{i} ({source!r}{arrow}{target!r}, {key!r})" msg2 = 'Hint: If multigraph add "multigraph 1" to file header.' raise nx.NetworkXError(msg + " is duplicated\n" + msg2) G.add_edge(source, target, key, **edge) if label is not None and label != "id": G = nx.relabel_nodes(G, mapping) return G
[docs] def literal_stringizer(value): """将 `value` 转换为 GML 表示形式的 Python 字面量。 Parameters ---------- value : object 要转换为 GML 表示形式的 `value` 。 Returns ------- rep : string 表示值的双引号 Python 字面量。不可打印的字符被替换为 XML 字符引用。 Raises ------ ValueError 如果 `value` 无法转换为 GML。 Notes ----- 可以使用 :func:`networkx.readwrite.gml.literal_destringizer` 函数恢复原始值。 """ def stringize(value): if isinstance(value, int | bool) or value is None: if value is True: # GML uses 1/0 for boolean values. buf.write(str(1)) elif value is False: buf.write(str(0)) else: buf.write(str(value)) elif isinstance(value, str): text = repr(value) if text[0] != "u": try: value.encode("latin1") except UnicodeEncodeError: text = "u" + text buf.write(text) elif isinstance(value, float | complex | str | bytes): buf.write(repr(value)) elif isinstance(value, list): buf.write("[") first = True for item in value: if not first: buf.write(",") else: first = False stringize(item) buf.write("]") elif isinstance(value, tuple): if len(value) > 1: buf.write("(") first = True for item in value: if not first: buf.write(",") else: first = False stringize(item) buf.write(")") elif value: buf.write("(") stringize(value[0]) buf.write(",)") else: buf.write("()") elif isinstance(value, dict): buf.write("{") first = True for key, value in value.items(): if not first: buf.write(",") else: first = False stringize(key) buf.write(":") stringize(value) buf.write("}") elif isinstance(value, set): buf.write("{") first = True for item in value: if not first: buf.write(",") else: first = False stringize(item) buf.write("}") else: msg = f"{value!r} cannot be converted into a Python literal" raise ValueError(msg) buf = StringIO() stringize(value) return buf.getvalue()
[docs] def generate_gml(G, stringizer=None): r"""生成图 `G` 的单个条目,格式为 GML。 Parameters ---------- G : NetworkX 图 要转换为 GML 的图。 stringizer : 可调用对象, 可选 一个 `stringizer` ,将非 int/非 float/非 dict 值转换为字符串。如果它无法将值转换为字符串,应抛出 `ValueError` 以表示无法转换。默认值:None。 Returns ------- lines: 字符串生成器 GML 数据的行。不附加换行符。 Raises ------ NetworkXError 如果 `stringizer` 无法将值转换为字符串,或者要转换的值不是字符串而 `stringizer` 为 None。 See Also -------- literal_stringizer Notes ----- 图属性名为 'directed', 'multigraph', 'node' 或 'edge',节点属性名为 'id' 或 'label',边属性名为 'source' 或 'target'(或 `G` 为多图时的 'key')将被忽略,因为这些属性名用于表示图结构。 GML 文件使用 7 位 ASCII 编码存储,任何扩展 ASCII 字符(iso8859-1)以 HTML 字符实体形式出现。如果不指定 `stringizer` / `destringizer` ,代码能够根据 GML 规范写入 `int` / `float` / `str` / `dict` / `list` 数据。对于写入其他数据类型,以及读取非 `str` 数据,需要显式提供 `stringizer` / `destringizer` 。 有关 GML 文件格式的更多文档,请参阅 `GML 网址 <https://web.archive.org/web/20190207140002/http://www.fim.uni-passau.de/index.php?id=17297&L=1>`_。 有关更多详细信息,请参阅模块文档字符串 :mod:`networkx.readwrite.gml` 。 Examples -------- >>> G = nx.Graph() >>> G.add_node("1") >>> print("\n".join(nx.generate_gml(G))) graph [ node [ id 0 label "1" ] ] >>> G = nx.MultiGraph([("a", "b"), ("a", "b")]) >>> print("\n".join(nx.generate_gml(G))) graph [ multigraph 1 node [ id 0 label "a" ] node [ id 1 label "b" ] edge [ source 0 target 1 key 0 ] edge [ source 0 target 1 key 1 ] ] """ valid_keys = re.compile("^[A-Za-z][0-9A-Za-z_]*$") def stringize(key, value, ignored_keys, indent, in_list=False): if not isinstance(key, str): raise NetworkXError(f"{key!r} is not a string") if not valid_keys.match(key): raise NetworkXError(f"{key!r} is not a valid key") if not isinstance(key, str): key = str(key) if key not in ignored_keys: if isinstance(value, int | bool): if key == "label": yield indent + key + ' "' + str(value) + '"' elif value is True: # python bool is an instance of int yield indent + key + " 1" elif value is False: yield indent + key + " 0" # GML only supports signed 32-bit integers elif value < -(2**31) or value >= 2**31: yield indent + key + ' "' + str(value) + '"' else: yield indent + key + " " + str(value) elif isinstance(value, float): text = repr(value).upper() # GML matches INF to keys, so prepend + to INF. Use repr(float(*)) # instead of string literal to future proof against changes to repr. if text == repr(float("inf")).upper(): text = "+" + text else: # GML requires that a real literal contain a decimal point, but # repr may not output a decimal point when the mantissa is # integral and hence needs fixing. epos = text.rfind("E") if epos != -1 and text.find(".", 0, epos) == -1: text = text[:epos] + "." + text[epos:] if key == "label": yield indent + key + ' "' + text + '"' else: yield indent + key + " " + text elif isinstance(value, dict): yield indent + key + " [" next_indent = indent + " " for key, value in value.items(): yield from stringize(key, value, (), next_indent) yield indent + "]" elif isinstance(value, tuple) and key == "label": yield indent + key + f" \"({','.join(repr(v) for v in value)})\"" elif isinstance(value, list | tuple) and key != "label" and not in_list: if len(value) == 0: yield indent + key + " " + f'"{value!r}"' if len(value) == 1: yield indent + key + " " + f'"{LIST_START_VALUE}"' for val in value: yield from stringize(key, val, (), indent, True) else: if stringizer: try: value = stringizer(value) except ValueError as err: raise NetworkXError( f"{value!r} cannot be converted into a string" ) from err if not isinstance(value, str): raise NetworkXError(f"{value!r} is not a string") yield indent + key + ' "' + escape(value) + '"' multigraph = G.is_multigraph() yield "graph [" # Output graph attributes if G.is_directed(): yield " directed 1" if multigraph: yield " multigraph 1" ignored_keys = {"directed", "multigraph", "node", "edge"} for attr, value in G.graph.items(): yield from stringize(attr, value, ignored_keys, " ") # Output node data node_id = dict(zip(G, range(len(G)))) ignored_keys = {"id", "label"} for node, attrs in G.nodes.items(): yield " node [" yield " id " + str(node_id[node]) yield from stringize("label", node, (), " ") for attr, value in attrs.items(): yield from stringize(attr, value, ignored_keys, " ") yield " ]" # Output edge data ignored_keys = {"source", "target"} kwargs = {"data": True} if multigraph: ignored_keys.add("key") kwargs["keys"] = True for e in G.edges(**kwargs): yield " edge [" yield " source " + str(node_id[e[0]]) yield " target " + str(node_id[e[1]]) if multigraph: yield from stringize("key", e[2], (), " ") for attr, value in e[-1].items(): yield from stringize(attr, value, ignored_keys, " ") yield " ]" yield "]"
[docs] @open_file(1, mode="wb") def write_gml(G, path, stringizer=None): """将图 `G` 以 GML 格式写入文件或文件句柄 `path` 。 Parameters ---------- G : NetworkX 图 要转换为 GML 的图。 path : 文件名或文件句柄 要写入的文件名或文件句柄。文件名以 .gz 或 .bz2 结尾的文件将被压缩。 stringizer : 可调用对象, 可选 一个 `stringizer` ,用于将非 int/非 float/非 dict 值转换为字符串。如果它无法将一个值转换为字符串,应抛出 `ValueError` 以表示无法转换。默认值:None。 Raises ------ NetworkXError 如果 `stringizer` 无法将一个值转换为字符串,或者要转换的值不是字符串而 `stringizer` 为 None。 See Also -------- read_gml, generate_gml literal_stringizer Notes ----- 名为 'directed', 'multigraph', 'node' 或 'edge' 的图属性,名为 'id' 或 'label' 的节点属性,以及名为 'source' 或 'target'(或如果 `G` 是多图则为 'key')的边属性将被忽略,因为这些属性名称用于表示图结构。 GML 文件使用 7 位 ASCII 编码存储,任何扩展 ASCII 字符(iso8859-1)以 HTML 字符实体形式出现。如果不指定 `stringizer` / `destringizer` ,代码能够按照 GML 规范要求写入 `int` / `float` / `str` / `dict` / `list` 数据。对于写入其他数据类型,以及读取非 `str` 数据,需要显式提供 `stringizer` / `destringizer` 。 请注意,虽然我们允许从文件中读取非标准 GML,但我们确保写入 GML 格式。特别是,属性名称中不允许使用下划线。 有关 GML 文件格式的更多文档,请参阅 `GML 网址 <https://web.archive.org/web/20190207140002/http://www.fim.uni-passau.de/index.php?id=17297&L=1>`_。 有关更多详细信息,请参阅模块文档字符串 :mod:`networkx.readwrite.gml` 。 Examples -------- >>> G = nx.path_graph(4) >>> nx.write_gml(G, "test.gml") 以 .gz 或 .bz2 结尾的文件名将被压缩。 >>> nx.write_gml(G, "test.gml.gz") """ for line in generate_gml(G, stringizer): path.write((line + "\n").encode("ascii"))