Source code for networkx.algorithms.isomorphism.ismags

"""
ISMAGS算法
============

提供了一个ISMAGS算法的Python实现。[1]_

它能够找到两个图之间的(子图)同构,考虑到子图的对称性。在大多数情况下,VF2算法比这个实现更快(至少在小图上),但在某些情况下,存在指数数量的同构是关于对称性等价的。在这种情况下,ISMAGS算法将为每个对称组提供一个解决方案。

```python
>>> petersen = nx.petersen_graph()
>>> ismags = nx.isomorphism.ISMAGS(petersen, petersen)
>>> isomorphisms = list(ismags.isomorphisms_iter(symmetry=False))
>>> len(isomorphisms)
120
>>> isomorphisms = list(ismags.isomorphisms_iter(symmetry=True))
>>> answer = [{0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}]
>>> answer == isomorphisms
True
```

此外,这个实现还提供了一个接口,用于在考虑对称性的情况下找到任意两个图之间的最大公共诱导子图[2]_。给定 `graph` 和 `subgraph` ,算法将移除 `subgraph` 中的节点,直到 `subgraph` 同构于 `graph` 的一个子图。由于只考虑 `subgraph` 的对称性,值得思考如何提供你的图:

```python
>>> graph1 = nx.path_graph(4)
>>> graph2 = nx.star_graph(3)
>>> ismags = nx.isomorphism.ISMAGS(graph1, graph2)
>>> ismags.is_isomorphic()
False
>>> largest_common_subgraph = list(ismags.largest_common_subgraph())
>>> answer = [{1: 0, 0: 1, 2: 2}, {2: 0, 1: 1, 3: 2}]
>>> answer == largest_common_subgraph
True
>>> ismags2 = nx.isomorphism.ISMAGS(graph2, graph1)
>>> largest_common_subgraph = list(ismags2.largest_common_subgraph())
>>> answer = [
...     {1: 0, 0: 1, 2: 2},
...     {1: 0, 0: 1, 3: 2},
...     {2: 0, 0: 1, 1: 2},
...     {2: 0, 0: 1, 3: 2},
...     {3: 0, 0: 1, 1: 2},
...     {3: 0, 0: 1, 2: 2},
... ]
>>> answer == largest_common_subgraph
True
```

然而,当不考虑对称性时,这并不重要:

```python
>>> largest_common_subgraph = list(ismags.largest_common_subgraph(symmetry=False))
>>> answer = [
...     {1: 0, 0: 1, 2: 2},
...     {1: 0, 2: 1, 0: 2},
...     {2: 0, 1: 1, 3: 2},
...     {2: 0, 3: 1, 1: 2},
...     {1: 0, 0: 1, 2: 3},
...     {1: 0, 2: 1, 0: 3},
...     {2: 0, 1: 1, 3: 3},
...     {2: 0, 3: 1, 1: 3},
...     {1: 0, 0: 2, 2: 3},
...     {1: 0, 2: 2, 0: 3},
...     {2: 0, 1: 2, 3: 3},
...     {2: 0, 3: 2, 1: 3},
... ]
>>> answer == largest_common_subgraph
True
>>> largest_common_subgraph = list(ismags2.largest_common_subgraph(symmetry=False))
>>> answer = [
...     {1: 0, 0: 1, 2: 2},
...     {1: 0, 0: 1, 3: 2},
...     {2: 0, 0: 1, 1: 2},
...     {2: 0, 0: 1, 3: 2},
...     {3: 0, 0: 1, 1: 2},
...     {3: 0, 0: 1, 2: 2},
...     {1: 1, 0: 2, 2: 3},
...     {1: 1, 0: 2, 3: 3},
...     {2: 1, 0: 2, 1: 3},
...     {2: 1, 0: 2, 3: 3},
...     {3: 1, 0: 2, 1: 3},
...     {3: 1, 0: 2, 2: 3},
... ]
>>> answer == largest_common_subgraph
True
```

Notes
-----
- 当前实现仅适用于无向图。尽管如此,该算法在一般情况下也适用于有向图。
- 两个图的节点键需要是完全可排序且可哈希的。
- 假设节点和边的相等性是传递的:如果A等于B,且B等于C,则A等于C。

References
----------
.. [1] M. Houbraken, S. Demeyer, T. Michoel, P. Audenaert, D. Colle,
   M. Pickavet, "基于索引的具有一般对称性的子图匹配算法(ISMAGS):利用对称性实现更快的子图枚举", PLoS One 9(5): e97896, 2014.
   https://doi.org/10.1371/journal.pone.0097896
.. [2] https://en.wikipedia.org/wiki/Maximum_common_induced_subgraph
"""

__all__ = ["ISMAGS"]

import itertools
from collections import Counter, defaultdict
from functools import reduce, wraps


def are_all_equal(iterable):
    """返回 ``True`` 当且仅当 `iterable` 中的所有元素都相等;否则返回 ``False`` 。

Parameters
----------
iterable: collections.abc.Iterable
    将要检查元素的容器。

Returns
-------
bool
    ``True`` 当且仅当 `iterable` 中的所有元素比较相等,否则返回 ``False`` 。
"""
    try:
        shape = iterable.shape
    except AttributeError:
        pass
    else:
        if len(shape) > 1:
            message = "The function does not works on multidimensional arrays."
            raise NotImplementedError(message) from None

    iterator = iter(iterable)
    first = next(iterator, None)
    return all(item == first for item in iterator)


def make_partitions(items, test):
    """根据 ``test(item1, item2)`` 的结果将项目分组到集合中。
如果 `test` 返回 `True` ,则一对项目将归入同一集合。

Parameters
----------
items : collections.abc.Iterable[collections.abc.Hashable]
    待分区的项目
test : collections.abc.Callable[collections.abc.Hashable, collections.abc.Hashable]
    一个函数,将使用从项目中取出的两个参数调用。
    如果这两个项目需要归入同一分区,则应返回 `True` ,否则返回 `False` 。

Returns
-------
list[set]
    一组集合的列表,每个集合包含 `items` 中的一部分项目,
    使得 ``all(test(*pair) for pair in itertools.combinations(set, 2)) == True`` 

Notes
-----
假设函数 `test` 具有传递性:如果 ``test(a, b)`` 和 ``test(b, c)`` 返回 ``True`` ,
则 ``test(a, c)`` 也必须返回 ``True`` 。
"""
    partitions = []
    for item in items:
        for partition in partitions:
            p_item = next(iter(partition))
            if test(item, p_item):
                partition.add(item)
                break
        else:  # No break
            partitions.append({item})
    return partitions


def partition_to_color(partitions):
    """创建一个字典,将每个分区中的每个项目映射到其所属分区的索引。

Parameters
----------
partitions: collections.abc.Sequence[collections.abc.Iterable]
    由 :func:`make_partitions` 返回的分区。

Returns
-------
dict
"""
    colors = {}
    for color, keys in enumerate(partitions):
        for key in keys:
            colors[key] = color
    return colors


def intersect(collection_of_sets):
    """给定一组集合的集合,返回这些集合的交集。

Parameters
----------
collection_of_sets: collections.abc.Collection[set]
    一组集合的集合。

Returns
-------
set
    `collection_of_sets` 中所有集合的交集。将具有与最初从 `collection_of_sets` 中提取的项相同的类型。
"""
    collection_of_sets = list(collection_of_sets)
    first = collection_of_sets.pop()
    out = reduce(set.intersection, collection_of_sets, set(first))
    return type(first)(out)


[docs] class ISMAGS: """实现ISMAGS子图匹配算法。[1]_ ISMAGS代表“基于索引的具有一般对称性的子图匹配算法”。正如其名所示,该算法具有对称性意识,并且只会生成非对称同构。 Notes ----- 与VF2算法相比,该实现对提供的图和比较函数(:attr:`node_equality` 和:attr:`edge_equality` )施加了额外的条件: - 两个图中的节点键必须是可排序的以及可哈希的。 - 相等性必须是传递的:如果A等于B,且B等于C,那么A必须等于C。 Attributes ---------- graph: networkx.Graph subgraph: networkx.Graph node_equality: collections.abc.Callable 用于检查两个节点是否应被视为相等的函数。其签名如下: ``f(graph1: networkx.Graph, node1, graph2: networkx.Graph, node2) -> bool`` 。 `node1` 是 `graph1` 中的一个节点, `node2` 是 `graph2` 中的一个节点。 根据参数 `node_match` 构造。 edge_equality: collections.abc.Callable 用于检查两个边是否应被视为相等的函数。其签名如下: ``f(graph1: networkx.Graph, edge1, graph2: networkx.Graph, edge2) -> bool`` 。 `edge1` 是 `graph1` 中的一条边, `edge2` 是 `graph2` 中的一条边。 根据参数 `edge_match` 构造。 References ---------- .. [1] M. Houbraken, S. Demeyer, T. Michoel, P. Audenaert, D. Colle, M. Pickavet, "基于索引的具有一般对称性的子图匹配算法(ISMAGS):利用对称性加速子图枚举", PLoS One 9(5): e97896, 2014. https://doi.org/10.1371/journal.pone.0097896"""
[docs] def __init__(self, graph, subgraph, node_match=None, edge_match=None, cache=None): """Parameters ---------- graph: networkx.Graph 图对象。 subgraph: networkx.Graph 子图对象。 node_match: collections.abc.Callable 或 None 用于确定两个节点是否等价的函数。其签名应类似于 ``f(n1: dict, n2: dict) -> bool`` ,其中 `n1` 和 `n2` 是节点属性字典。参见 :func:`~networkx.algorithms.isomorphism.categorical_node_match` 及其相关函数。 如果为 `None` ,则所有节点都被视为相等。 edge_match: collections.abc.Callable 或 None 用于确定两个边是否等价的函数。其签名应类似于 ``f(e1: dict, e2: dict) -> bool`` ,其中 `e1` 和 `e2` 是边属性字典。参见 :func:`~networkx.algorithms.isomorphism.categorical_edge_match` 及其相关函数。 如果为 `None` ,则所有边都被视为相等。 cache: collections.abc.Mapping 用于缓存图对称性的缓存。 """ # TODO: graph and subgraph setter methods that invalidate the caches. # TODO: allow for precomputed partitions and colors self.graph = graph self.subgraph = subgraph self._symmetry_cache = cache # Naming conventions are taken from the original paper. For your # sanity: # sg: subgraph # g: graph # e: edge(s) # n: node(s) # So: sgn means "subgraph nodes". self._sgn_partitions_ = None self._sge_partitions_ = None self._sgn_colors_ = None self._sge_colors_ = None self._gn_partitions_ = None self._ge_partitions_ = None self._gn_colors_ = None self._ge_colors_ = None self._node_compat_ = None self._edge_compat_ = None if node_match is None: self.node_equality = self._node_match_maker(lambda n1, n2: True) self._sgn_partitions_ = [set(self.subgraph.nodes)] self._gn_partitions_ = [set(self.graph.nodes)] self._node_compat_ = {0: 0} else: self.node_equality = self._node_match_maker(node_match) if edge_match is None: self.edge_equality = self._edge_match_maker(lambda e1, e2: True) self._sge_partitions_ = [set(self.subgraph.edges)] self._ge_partitions_ = [set(self.graph.edges)] self._edge_compat_ = {0: 0} else: self.edge_equality = self._edge_match_maker(edge_match)
@property def _sgn_partitions(self): if self._sgn_partitions_ is None: def nodematch(node1, node2): return self.node_equality(self.subgraph, node1, self.subgraph, node2) self._sgn_partitions_ = make_partitions(self.subgraph.nodes, nodematch) return self._sgn_partitions_ @property def _sge_partitions(self): if self._sge_partitions_ is None: def edgematch(edge1, edge2): return self.edge_equality(self.subgraph, edge1, self.subgraph, edge2) self._sge_partitions_ = make_partitions(self.subgraph.edges, edgematch) return self._sge_partitions_ @property def _gn_partitions(self): if self._gn_partitions_ is None: def nodematch(node1, node2): return self.node_equality(self.graph, node1, self.graph, node2) self._gn_partitions_ = make_partitions(self.graph.nodes, nodematch) return self._gn_partitions_ @property def _ge_partitions(self): if self._ge_partitions_ is None: def edgematch(edge1, edge2): return self.edge_equality(self.graph, edge1, self.graph, edge2) self._ge_partitions_ = make_partitions(self.graph.edges, edgematch) return self._ge_partitions_ @property def _sgn_colors(self): if self._sgn_colors_ is None: self._sgn_colors_ = partition_to_color(self._sgn_partitions) return self._sgn_colors_ @property def _sge_colors(self): if self._sge_colors_ is None: self._sge_colors_ = partition_to_color(self._sge_partitions) return self._sge_colors_ @property def _gn_colors(self): if self._gn_colors_ is None: self._gn_colors_ = partition_to_color(self._gn_partitions) return self._gn_colors_ @property def _ge_colors(self): if self._ge_colors_ is None: self._ge_colors_ = partition_to_color(self._ge_partitions) return self._ge_colors_ @property def _node_compatibility(self): if self._node_compat_ is not None: return self._node_compat_ self._node_compat_ = {} for sgn_part_color, gn_part_color in itertools.product( range(len(self._sgn_partitions)), range(len(self._gn_partitions)) ): sgn = next(iter(self._sgn_partitions[sgn_part_color])) gn = next(iter(self._gn_partitions[gn_part_color])) if self.node_equality(self.subgraph, sgn, self.graph, gn): self._node_compat_[sgn_part_color] = gn_part_color return self._node_compat_ @property def _edge_compatibility(self): if self._edge_compat_ is not None: return self._edge_compat_ self._edge_compat_ = {} for sge_part_color, ge_part_color in itertools.product( range(len(self._sge_partitions)), range(len(self._ge_partitions)) ): sge = next(iter(self._sge_partitions[sge_part_color])) ge = next(iter(self._ge_partitions[ge_part_color])) if self.edge_equality(self.subgraph, sge, self.graph, ge): self._edge_compat_[sge_part_color] = ge_part_color return self._edge_compat_ @staticmethod def _node_match_maker(cmp): @wraps(cmp) def comparer(graph1, node1, graph2, node2): return cmp(graph1.nodes[node1], graph2.nodes[node2]) return comparer @staticmethod def _edge_match_maker(cmp): @wraps(cmp) def comparer(graph1, edge1, graph2, edge2): return cmp(graph1.edges[edge1], graph2.edges[edge2]) return comparer
[docs] def find_isomorphisms(self, symmetry=True): """查找子图与图之间的所有子图同构 找到满足 :attr:`subgraph` <= :attr:`graph` 的同构。 Parameters ---------- symmetry: bool 是否考虑对称性。如果为 False,找到的同构可能在对称上等价。 Yields ------ dict 找到的同构映射,形式为 {graph_node: subgraph_node}。 """ # The networkx VF2 algorithm is slightly funny in when it yields an # empty dict and when not. if not self.subgraph: yield {} return elif not self.graph: return elif len(self.graph) < len(self.subgraph): return if symmetry: _, cosets = self.analyze_symmetry( self.subgraph, self._sgn_partitions, self._sge_colors ) constraints = self._make_constraints(cosets) else: constraints = [] candidates = self._find_nodecolor_candidates() la_candidates = self._get_lookahead_candidates() for sgn in self.subgraph: extra_candidates = la_candidates[sgn] if extra_candidates: candidates[sgn] = candidates[sgn] | {frozenset(extra_candidates)} if any(candidates.values()): start_sgn = min(candidates, key=lambda n: min(candidates[n], key=len)) candidates[start_sgn] = (intersect(candidates[start_sgn]),) yield from self._map_nodes(start_sgn, candidates, constraints) else: return
@staticmethod def _find_neighbor_color_count(graph, node, node_color, edge_color): """对于图中的每个节点,统计它与特定颜色的节点之间的特定颜色边的数量。 """ counts = Counter() neighbors = graph[node] for neighbor in neighbors: n_color = node_color[neighbor] if (node, neighbor) in edge_color: e_color = edge_color[node, neighbor] else: e_color = edge_color[neighbor, node] counts[e_color, n_color] += 1 return counts def _get_lookahead_candidates(self): """返回一个映射 {子图节点: 图节点的集合},其中图节点是子图节点的可行候选节点,通过提前查看一条边来确定。 """ g_counts = {} for gn in self.graph: g_counts[gn] = self._find_neighbor_color_count( self.graph, gn, self._gn_colors, self._ge_colors ) candidates = defaultdict(set) for sgn in self.subgraph: sg_count = self._find_neighbor_color_count( self.subgraph, sgn, self._sgn_colors, self._sge_colors ) new_sg_count = Counter() for (sge_color, sgn_color), count in sg_count.items(): try: ge_color = self._edge_compatibility[sge_color] gn_color = self._node_compatibility[sgn_color] except KeyError: pass else: new_sg_count[ge_color, gn_color] = count for gn, g_count in g_counts.items(): if all(new_sg_count[x] <= g_count[x] for x in new_sg_count): # Valid candidate candidates[sgn].add(gn) return candidates
[docs] def largest_common_subgraph(self, symmetry=True): """找到 :attr:`subgraph` 和 :attr:`graph` 之间最大的公共诱导子图。 Parameters ---------- symmetry: bool 是否考虑对称性。如果为 False,找到的最大公共子图可能是对称等价的。 Yields ------ dict 找到的同构映射,形式为 {graph_node: subgraph_node}。 """ # The networkx VF2 algorithm is slightly funny in when it yields an # empty dict and when not. if not self.subgraph: yield {} return elif not self.graph: return if symmetry: _, cosets = self.analyze_symmetry( self.subgraph, self._sgn_partitions, self._sge_colors ) constraints = self._make_constraints(cosets) else: constraints = [] candidates = self._find_nodecolor_candidates() if any(candidates.values()): yield from self._largest_common_subgraph(candidates, constraints) else: return
[docs] def analyze_symmetry(self, graph, node_partitions, edge_colors): """找到一组最小的置换和相应的陪集,描述给定节点和边等价性的 `graph` 的对称性,分别由 `node_partitions` 和 `edge_colors` 给出。 Parameters ---------- graph : networkx.Graph 需要分析对称性的图。 node_partitions : 包含集合的列表 包含节点键的集合列表。同一集合中的节点键被视为等价。 `graph` 中的每个节点键应恰好在一个集合中。如果所有节点都等价,则应为 ``[set(graph.nodes)]`` 。 edge_colors : 映射边到颜色的字典 将 `graph` 中的每条边映射到其对应颜色的字典。具有相同颜色的边被视为等价。如果所有边都等价,则应为 ``{e: 0 for e in graph.edges}`` 。 Returns ------- set[frozenset] 找到的置换。这是一个由节点键对组成的frozenset集合,这些节点键对可以在不改变 `subgraph` 的情况下互换。 dict[collections.abc.Hashable, set[collections.abc.Hashable]] 找到的陪集。陪集是一个字典,形式为 ``{节点键: 节点键集合}`` 。 每个键值对描述了哪些 ``值`` 可以在不改变小于 ``键`` 的节点的情况下互换。 """ if self._symmetry_cache is not None: key = hash( ( tuple(graph.nodes), tuple(graph.edges), tuple(map(tuple, node_partitions)), tuple(edge_colors.items()), ) ) if key in self._symmetry_cache: return self._symmetry_cache[key] node_partitions = list( self._refine_node_partitions(graph, node_partitions, edge_colors) ) assert len(node_partitions) == 1 node_partitions = node_partitions[0] permutations, cosets = self._process_ordered_pair_partitions( graph, node_partitions, node_partitions, edge_colors ) if self._symmetry_cache is not None: self._symmetry_cache[key] = permutations, cosets return permutations, cosets
[docs] def is_isomorphic(self, symmetry=False): """如果 :attr:`graph` 与 :attr:`subgraph` 同构,则返回 True,否则返回 False。 Returns ------- bool """ return len(self.subgraph) == len(self.graph) and self.subgraph_is_isomorphic( symmetry )
[docs] def subgraph_is_isomorphic(self, symmetry=False): """如果 :attr:`graph` 的子图与 :attr:`subgraph` 同构,则返回 True,否则返回 False。 Returns ------- 布尔值 """ # symmetry=False, since we only need to know whether there is any # example; figuring out all symmetry elements probably costs more time # than it gains. isom = next(self.subgraph_isomorphisms_iter(symmetry=symmetry), None) return isom is not None
[docs] def isomorphisms_iter(self, symmetry=True): """与 :meth:`find_isomorphisms` 功能相同,如果 :attr:`graph` 和 :attr:`subgraph` 的节点数量相同。 """ if len(self.graph) == len(self.subgraph): yield from self.subgraph_isomorphisms_iter(symmetry=symmetry)
[docs] def subgraph_isomorphisms_iter(self, symmetry=True): """:meth:`find_isomorphisms` 的替代名称。""" return self.find_isomorphisms(symmetry)
def _find_nodecolor_candidates(self): """ 子图中每个节点,查找图中所有颜色相同的节点。 """ candidates = defaultdict(set) for sgn in self.subgraph.nodes: sgn_color = self._sgn_colors[sgn] if sgn_color in self._node_compatibility: gn_color = self._node_compatibility[sgn_color] candidates[sgn].add(frozenset(self._gn_partitions[gn_color])) else: candidates[sgn].add(frozenset()) candidates = dict(candidates) for sgn, options in candidates.items(): candidates[sgn] = frozenset(options) return candidates @staticmethod def _make_constraints(cosets): """ 将陪集转换为约束条件。 """ constraints = [] for node_i, node_ts in cosets.items(): for node_t in node_ts: if node_i != node_t: # Node i must be smaller than node t. constraints.append((node_i, node_t)) return constraints @staticmethod def _find_node_edge_color(graph, node_colors, edge_colors): """对于图中的每个节点,生成一种颜色,该颜色结合了1)节点本身的颜色,以及2)每种类型节点颜色的边的数量。 """ counts = defaultdict(lambda: defaultdict(int)) for node1, node2 in graph.edges: if (node1, node2) in edge_colors: # FIXME directed graphs ecolor = edge_colors[node1, node2] else: ecolor = edge_colors[node2, node1] # Count per node how many edges it has of what color to nodes of # what color counts[node1][ecolor, node_colors[node2]] += 1 counts[node2][ecolor, node_colors[node1]] += 1 node_edge_colors = {} for node in graph.nodes: node_edge_colors[node] = node_colors[node], set(counts[node].items()) return node_edge_colors @staticmethod def _get_permutations_by_length(items): """获取所有项目的排列组合,但仅对具有相同长度的项目进行排列。 >>> found = list(ISMAGS._get_permutations_by_length([[1], [2], [3, 4], [4, 5]])) >>> answer = [ ... (([1], [2]), ([3, 4], [4, 5])), ... (([1], [2]), ([4, 5], [3, 4])), ... (([2], [1]), ([3, 4], [4, 5])), ... (([2], [1]), ([4, 5], [3, 4])), ... ] >>> found == answer True """ by_len = defaultdict(list) for item in items: by_len[len(item)].append(item) yield from itertools.product( *(itertools.permutations(by_len[l]) for l in sorted(by_len)) ) @classmethod def _refine_node_partitions(cls, graph, node_partitions, edge_colors, branch=False): """给定图中的节点划分,使划分更小,使得划分中的所有节点具有以下特征:1)相同的颜色,以及 2)与特定其他划分具有相同数量的边。 """ def equal_color(node1, node2): return node_edge_colors[node1] == node_edge_colors[node2] node_partitions = list(node_partitions) node_colors = partition_to_color(node_partitions) node_edge_colors = cls._find_node_edge_color(graph, node_colors, edge_colors) if all( are_all_equal(node_edge_colors[node] for node in partition) for partition in node_partitions ): yield node_partitions return new_partitions = [] output = [new_partitions] for partition in node_partitions: if not are_all_equal(node_edge_colors[node] for node in partition): refined = make_partitions(partition, equal_color) if ( branch and len(refined) != 1 and len({len(r) for r in refined}) != len([len(r) for r in refined]) ): # This is where it breaks. There are multiple new cells # in refined with the same length, and their order # matters. # So option 1) Hit it with a big hammer and simply make all # orderings. permutations = cls._get_permutations_by_length(refined) new_output = [] for n_p in output: for permutation in permutations: new_output.append(n_p + list(permutation[0])) output = new_output else: for n_p in output: n_p.extend(sorted(refined, key=len)) else: for n_p in output: n_p.append(partition) for n_p in output: yield from cls._refine_node_partitions(graph, n_p, edge_colors, branch) def _edges_of_same_color(self, sgn1, sgn2): """返回在 :attr:`graph` 中所有与 :attr:`subgraph` 中 sgn1 和 sgn2 之间边颜色相同的边。 """ if (sgn1, sgn2) in self._sge_colors: # FIXME directed graphs sge_color = self._sge_colors[sgn1, sgn2] else: sge_color = self._sge_colors[sgn2, sgn1] if sge_color in self._edge_compatibility: ge_color = self._edge_compatibility[sge_color] g_edges = self._ge_partitions[ge_color] else: g_edges = [] return g_edges def _map_nodes(self, sgn, candidates, constraints, mapping=None, to_be_mapped=None): """ 找到所有符合约束条件的子图同构。 """ if mapping is None: mapping = {} else: mapping = mapping.copy() if to_be_mapped is None: to_be_mapped = set(self.subgraph.nodes) # Note, we modify candidates here. Doesn't seem to affect results, but # remember this. # candidates = candidates.copy() sgn_candidates = intersect(candidates[sgn]) candidates[sgn] = frozenset([sgn_candidates]) for gn in sgn_candidates: # We're going to try to map sgn to gn. if gn in mapping.values() or sgn not in to_be_mapped: # gn is already mapped to something continue # pragma: no cover # REDUCTION and COMBINATION mapping[sgn] = gn # BASECASE if to_be_mapped == set(mapping.keys()): yield {v: k for k, v in mapping.items()} continue left_to_map = to_be_mapped - set(mapping.keys()) new_candidates = candidates.copy() sgn_nbrs = set(self.subgraph[sgn]) not_gn_nbrs = set(self.graph.nodes) - set(self.graph[gn]) for sgn2 in left_to_map: if sgn2 not in sgn_nbrs: gn2_options = not_gn_nbrs else: # Get all edges to gn of the right color: g_edges = self._edges_of_same_color(sgn, sgn2) # FIXME directed graphs # And all nodes involved in those which are connected to gn gn2_options = {n for e in g_edges for n in e if gn in e} # Node color compatibility should be taken care of by the # initial candidate lists made by find_subgraphs # Add gn2_options to the right collection. Since new_candidates # is a dict of frozensets of frozensets of node indices it's # a bit clunky. We can't do .add, and + also doesn't work. We # could do |, but I deem union to be clearer. new_candidates[sgn2] = new_candidates[sgn2].union( [frozenset(gn2_options)] ) if (sgn, sgn2) in constraints: gn2_options = {gn2 for gn2 in self.graph if gn2 > gn} elif (sgn2, sgn) in constraints: gn2_options = {gn2 for gn2 in self.graph if gn2 < gn} else: continue # pragma: no cover new_candidates[sgn2] = new_candidates[sgn2].union( [frozenset(gn2_options)] ) # The next node is the one that is unmapped and has fewest # candidates next_sgn = min(left_to_map, key=lambda n: min(new_candidates[n], key=len)) yield from self._map_nodes( next_sgn, new_candidates, constraints, mapping=mapping, to_be_mapped=to_be_mapped, ) # Unmap sgn-gn. Strictly not necessary since it'd get overwritten # when making a new mapping for sgn. # del mapping[sgn] def _largest_common_subgraph(self, candidates, constraints, to_be_mapped=None): """ 找到所有符合约束条件的最长公共子图。 """ if to_be_mapped is None: to_be_mapped = {frozenset(self.subgraph.nodes)} # The LCS problem is basically a repeated subgraph isomorphism problem # with smaller and smaller subgraphs. We store the nodes that are # "part of" the subgraph in to_be_mapped, and we make it a little # smaller every iteration. current_size = len(next(iter(to_be_mapped), [])) found_iso = False if current_size <= len(self.graph): # There's no point in trying to find isomorphisms of # graph >= subgraph if subgraph has more nodes than graph. # Try the isomorphism first with the nodes with lowest ID. So sort # them. Those are more likely to be part of the final # correspondence. This makes finding the first answer(s) faster. In # theory. for nodes in sorted(to_be_mapped, key=sorted): # Find the isomorphism between subgraph[to_be_mapped] <= graph next_sgn = min(nodes, key=lambda n: min(candidates[n], key=len)) isomorphs = self._map_nodes( next_sgn, candidates, constraints, to_be_mapped=nodes ) # This is effectively `yield from isomorphs`, except that we look # whether an item was yielded. try: item = next(isomorphs) except StopIteration: pass else: yield item yield from isomorphs found_iso = True # BASECASE if found_iso or current_size == 1: # Shrinking has no point because either 1) we end up with a smaller # common subgraph (and we want the largest), or 2) there'll be no # more subgraph. return left_to_be_mapped = set() for nodes in to_be_mapped: for sgn in nodes: # We're going to remove sgn from to_be_mapped, but subject to # symmetry constraints. We know that for every constraint we # have those subgraph nodes are equal. So whenever we would # remove the lower part of a constraint, remove the higher # instead. This is all dealth with by _remove_node. And because # left_to_be_mapped is a set, we don't do double work. # And finally, make the subgraph one node smaller. # REDUCTION new_nodes = self._remove_node(sgn, nodes, constraints) left_to_be_mapped.add(new_nodes) # COMBINATION yield from self._largest_common_subgraph( candidates, constraints, to_be_mapped=left_to_be_mapped ) @staticmethod def _remove_node(node, nodes, constraints): """返回一个新的集合,其中节点已从节点中移除,考虑到对称约束。我们知道,对于每个约束,那些子图节点是相等的。因此,每当我们需要移除约束的较低部分时,改为移除较高部分。 """ while True: for low, high in constraints: if low == node and high in nodes: node = high break else: # no break, couldn't find node in constraints break return frozenset(nodes - {node}) @staticmethod def _find_permutations(top_partitions, bottom_partitions): """返回顶部和底部分区中不同的分区对。确保顶部和底部分区中的所有分区大小均为1。 """ # Find permutations permutations = set() for top, bot in zip(top_partitions, bottom_partitions): # top and bot have only one element if len(top) != 1 or len(bot) != 1: raise IndexError( "Not all nodes are coupled. This is" f" impossible: {top_partitions}, {bottom_partitions}" ) if top != bot: permutations.add(frozenset((next(iter(top)), next(iter(bot))))) return permutations @staticmethod def _update_orbits(orbits, permutations): """基于排列更新轨道。轨道在原地修改。 对于排列中的每一对项,它们各自的轨道将被合并。 """ for permutation in permutations: node, node2 = permutation # Find the orbits that contain node and node2, and replace the # orbit containing node with the union first = second = None for idx, orbit in enumerate(orbits): if first is not None and second is not None: break if node in orbit: first = idx if node2 in orbit: second = idx if first != second: orbits[first].update(orbits[second]) del orbits[second] def _couple_nodes( self, top_partitions, bottom_partitions, pair_idx, t_node, b_node, graph, edge_colors, ): """从顶部和底部分区生成新分区,其中t_node与b_node耦合。pair_idx是分区索引,其中可以找到t_node和b_node。 """ t_partition = top_partitions[pair_idx] b_partition = bottom_partitions[pair_idx] assert t_node in t_partition and b_node in b_partition # Couple node to node2. This means they get their own partition new_top_partitions = [top.copy() for top in top_partitions] new_bottom_partitions = [bot.copy() for bot in bottom_partitions] new_t_groups = {t_node}, t_partition - {t_node} new_b_groups = {b_node}, b_partition - {b_node} # Replace the old partitions with the coupled ones del new_top_partitions[pair_idx] del new_bottom_partitions[pair_idx] new_top_partitions[pair_idx:pair_idx] = new_t_groups new_bottom_partitions[pair_idx:pair_idx] = new_b_groups new_top_partitions = self._refine_node_partitions( graph, new_top_partitions, edge_colors ) new_bottom_partitions = self._refine_node_partitions( graph, new_bottom_partitions, edge_colors, branch=True ) new_top_partitions = list(new_top_partitions) assert len(new_top_partitions) == 1 new_top_partitions = new_top_partitions[0] for bot in new_bottom_partitions: yield list(new_top_partitions), bot def _process_ordered_pair_partitions( self, graph, top_partitions, bottom_partitions, edge_colors, orbits=None, cosets=None, ): """按照参考论文处理有序对分区。查找并返回所有使图保持不变的排列和陪集。 """ if orbits is None: orbits = [{node} for node in graph.nodes] else: # Note that we don't copy orbits when we are given one. This means # we leak information between the recursive branches. This is # intentional! orbits = orbits if cosets is None: cosets = {} else: cosets = cosets.copy() assert all( len(t_p) == len(b_p) for t_p, b_p in zip(top_partitions, bottom_partitions) ) # BASECASE if all(len(top) == 1 for top in top_partitions): # All nodes are mapped permutations = self._find_permutations(top_partitions, bottom_partitions) self._update_orbits(orbits, permutations) if permutations: return [permutations], cosets else: return [], cosets permutations = [] unmapped_nodes = { (node, idx) for idx, t_partition in enumerate(top_partitions) for node in t_partition if len(t_partition) > 1 } node, pair_idx = min(unmapped_nodes) b_partition = bottom_partitions[pair_idx] for node2 in sorted(b_partition): if len(b_partition) == 1: # Can never result in symmetry continue if node != node2 and any( node in orbit and node2 in orbit for orbit in orbits ): # Orbit prune branch continue # REDUCTION # Couple node to node2 partitions = self._couple_nodes( top_partitions, bottom_partitions, pair_idx, node, node2, graph, edge_colors, ) for opp in partitions: new_top_partitions, new_bottom_partitions = opp new_perms, new_cosets = self._process_ordered_pair_partitions( graph, new_top_partitions, new_bottom_partitions, edge_colors, orbits, cosets, ) # COMBINATION permutations += new_perms cosets.update(new_cosets) mapped = { k for top, bottom in zip(top_partitions, bottom_partitions) for k in top if len(top) == 1 and top == bottom } ks = {k for k in graph.nodes if k < node} # Have all nodes with ID < node been mapped? find_coset = ks <= mapped and node not in cosets if find_coset: # Find the orbit that contains node for orbit in orbits: if node in orbit: cosets[node] = orbit.copy() return permutations, cosets