Source code for langchain_experimental.data_anonymizer.presidio

from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union

import yaml

from langchain_experimental.data_anonymizer.base import (
    DEFAULT_DEANONYMIZER_MATCHING_STRATEGY,
    AnonymizerBase,
    ReversibleAnonymizerBase,
)
from langchain_experimental.data_anonymizer.deanonymizer_mapping import (
    DeanonymizerMapping,
    MappingDataType,
    create_anonymizer_mapping,
)
from langchain_experimental.data_anonymizer.deanonymizer_matching_strategies import (
    exact_matching_strategy,
)
from langchain_experimental.data_anonymizer.faker_presidio_mapping import (
    get_pseudoanonymizer_mapping,
)

if TYPE_CHECKING:
    from presidio_analyzer import AnalyzerEngine, EntityRecognizer
    from presidio_analyzer.nlp_engine import NlpEngineProvider
    from presidio_anonymizer import AnonymizerEngine
    from presidio_anonymizer.entities import ConflictResolutionStrategy, OperatorConfig


def _import_analyzer_engine() -> "AnalyzerEngine":
    try:
        from presidio_analyzer import AnalyzerEngine

    except ImportError as e:
        raise ImportError(
            "Could not import presidio_analyzer, please install with "
            "`pip install presidio-analyzer`. You will also need to download a "
            "spaCy model to use the analyzer, e.g. "
            "`python -m spacy download en_core_web_lg`."
        ) from e
    return AnalyzerEngine


def _import_nlp_engine_provider() -> "NlpEngineProvider":
    try:
        from presidio_analyzer.nlp_engine import NlpEngineProvider

    except ImportError as e:
        raise ImportError(
            "Could not import presidio_analyzer, please install with "
            "`pip install presidio-analyzer`. You will also need to download a "
            "spaCy model to use the analyzer, e.g. "
            "`python -m spacy download en_core_web_lg`."
        ) from e
    return NlpEngineProvider


def _import_anonymizer_engine() -> "AnonymizerEngine":
    try:
        from presidio_anonymizer import AnonymizerEngine
    except ImportError as e:
        raise ImportError(
            "Could not import presidio_anonymizer, please install with "
            "`pip install presidio-anonymizer`."
        ) from e
    return AnonymizerEngine


def _import_operator_config() -> "OperatorConfig":
    try:
        from presidio_anonymizer.entities import OperatorConfig
    except ImportError as e:
        raise ImportError(
            "Could not import presidio_anonymizer, please install with "
            "`pip install presidio-anonymizer`."
        ) from e
    return OperatorConfig


# Configuring Anonymizer for multiple languages
# Detailed description and examples can be found here:
# langchain/docs/extras/guides/privacy/multi_language_anonymization.ipynb
DEFAULT_LANGUAGES_CONFIG = {
    # You can also use Stanza or transformers library.
    # See https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/
    "nlp_engine_name": "spacy",
    "models": [
        {"lang_code": "en", "model_name": "en_core_web_lg"},
        # {"lang_code": "de", "model_name": "de_core_news_md"},
        # {"lang_code": "es", "model_name": "es_core_news_md"},
        # ...
        # List of available models: https://spacy.io/usage/models
    ],
}


[docs]class PresidioAnonymizerBase(AnonymizerBase): """Base 使用Microsoft Presidio的匿名化工具。 See more: https://microsoft.github.io/presidio/ """
[docs] def __init__( self, analyzed_fields: Optional[List[str]] = None, operators: Optional[Dict[str, OperatorConfig]] = None, languages_config: Optional[Dict] = None, add_default_faker_operators: bool = True, faker_seed: Optional[int] = None, ): """参数: analyzed_fields:需要检测然后进行匿名化的字段列表。 默认为Microsoft Presidio支持的所有实体。 operators:用于匿名化的操作符。 操作符允许对检测到的个人身份信息进行自定义匿名化。 了解更多: https://microsoft.github.io/presidio/tutorial/10_simple_anonymization/ languages_config:NLP引擎的配置。 列表中的第一个语言将在未指定语言时作为self.anonymize(...)中的主要语言使用。 了解更多: https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/ faker_seed:用于初始化faker的种子。 默认为None,此时faker将被随机种子化并提供随机值。 """ if languages_config is None: languages_config = DEFAULT_LANGUAGES_CONFIG OperatorConfig = _import_operator_config() AnalyzerEngine = _import_analyzer_engine() NlpEngineProvider = _import_nlp_engine_provider() AnonymizerEngine = _import_anonymizer_engine() self.analyzed_fields = ( analyzed_fields if analyzed_fields is not None else list(get_pseudoanonymizer_mapping().keys()) ) if add_default_faker_operators: self.operators = { field: OperatorConfig( operator_name="custom", params={"lambda": faker_function} ) for field, faker_function in get_pseudoanonymizer_mapping( faker_seed ).items() } else: self.operators = {} if operators: self.add_operators(operators) provider = NlpEngineProvider(nlp_configuration=languages_config) nlp_engine = provider.create_engine() self.supported_languages = list(nlp_engine.nlp.keys()) self._analyzer = AnalyzerEngine( supported_languages=self.supported_languages, nlp_engine=nlp_engine ) self._anonymizer = AnonymizerEngine()
[docs] def add_recognizer(self, recognizer: EntityRecognizer) -> None: """向分析器添加一个识别器 参数: recognizer: 要添加到分析器中的识别器。 """ self._analyzer.registry.add_recognizer(recognizer) self.analyzed_fields.extend(recognizer.supported_entities)
[docs] def add_operators(self, operators: Dict[str, OperatorConfig]) -> None: """将运算符添加到匿名化器 参数: operators:要添加到匿名化器的运算符。 """ self.operators.update(operators)
[docs]class PresidioAnonymizer(PresidioAnonymizerBase): """使用Microsoft Presidio的匿名化工具。""" def _anonymize( self, text: str, language: Optional[str] = None, allow_list: Optional[List[str]] = None, conflict_resolution: Optional[ConflictResolutionStrategy] = None, ) -> str: """匿名化文本。 每个PII实体都将被替换为虚假值。 每次虚假值都会不同,因为它们是随机生成的。 PresidioAnonymizer没有内置记忆 - 因此它不会记住匿名化先前文本的效果。 >>> anonymizer = PresidioAnonymizer() >>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") '我的名字是Noah Rhodes。嗨,Noah Rhodes!' >>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") '我的名字是Brett Russell。嗨,Brett Russell!' 参数: text:要匿名化的文本 language:用于分析PII的语言 如果为None,则将使用配置中指定的语言列表中的第一个(主要)语言。 """ if language is None: language = self.supported_languages[0] elif language not in self.supported_languages: raise ValueError( f"Language '{language}' is not supported. " f"Supported languages are: {self.supported_languages}. " "Change your language configuration file to add more languages." ) # Check supported entities for given language # e.g. IT_FISCAL_CODE is not supported for English in Presidio by default # If you want to use it, you need to add a recognizer manually supported_entities = [] for recognizer in self._analyzer.get_recognizers(language): recognizer_dict = recognizer.to_dict() supported_entities.extend( [recognizer_dict["supported_entity"]] if "supported_entity" in recognizer_dict else recognizer_dict["supported_entities"] ) entities_to_analyze = list( set(supported_entities).intersection(set(self.analyzed_fields)) ) analyzer_results = self._analyzer.analyze( text, entities=entities_to_analyze, language=language, allow_list=allow_list, ) filtered_analyzer_results = ( self._anonymizer._remove_conflicts_and_get_text_manipulation_data( analyzer_results, conflict_resolution ) ) anonymizer_results = self._anonymizer.anonymize( text, analyzer_results=analyzer_results, operators=self.operators, ) anonymizer_mapping = create_anonymizer_mapping( text, filtered_analyzer_results, anonymizer_results, ) return exact_matching_strategy(text, anonymizer_mapping)
[docs]class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerBase): """Reversible 使用Microsoft Presidio的匿名化工具。"""
[docs] def __init__( self, analyzed_fields: Optional[List[str]] = None, operators: Optional[Dict[str, OperatorConfig]] = None, languages_config: Optional[Dict] = None, add_default_faker_operators: bool = True, faker_seed: Optional[int] = None, ): if languages_config is None: languages_config = DEFAULT_LANGUAGES_CONFIG super().__init__( analyzed_fields, operators, languages_config, add_default_faker_operators, faker_seed, ) self._deanonymizer_mapping = DeanonymizerMapping()
@property def deanonymizer_mapping(self) -> MappingDataType: """返回去匿名化映射""" return self._deanonymizer_mapping.data @property def anonymizer_mapping(self) -> MappingDataType: """返回匿名化映射 这只是反向版本的去匿名化映射。 """ return { key: {v: k for k, v in inner_dict.items()} for key, inner_dict in self.deanonymizer_mapping.items() } def _anonymize( self, text: str, language: Optional[str] = None, allow_list: Optional[List[str]] = None, conflict_resolution: Optional[ConflictResolutionStrategy] = None, ) -> str: """匿名化文本。 每个PII实体都将被替换为一个虚假值。 每次虚假值都会不同,因为它们是随机生成的。 与此同时,我们将创建一个映射,将每个匿名化实体映射回其原始文本值。 由于内置内存的存在,所有先前匿名化的实体都将被记住,并被相同的虚假值替换: >>> anonymizer = PresidioReversibleAnonymizer() >>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") 'My name is Noah Rhodes. Hi Noah Rhodes!' >>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") 'My name is Noah Rhodes. Hi Noah Rhodes!' 参数: text: 要匿名化的文本 language: 用于分析PII的语言 如果为None,则将使用配置中指定的语言列表中的第一个(主要)语言。 """ if language is None: language = self.supported_languages[0] if language not in self.supported_languages: raise ValueError( f"Language '{language}' is not supported. " f"Supported languages are: {self.supported_languages}. " "Change your language configuration file to add more languages." ) # Check supported entities for given language # e.g. IT_FISCAL_CODE is not supported for English in Presidio by default # If you want to use it, you need to add a recognizer manually supported_entities = [] for recognizer in self._analyzer.get_recognizers(language): recognizer_dict = recognizer.to_dict() supported_entities.extend( [recognizer_dict["supported_entity"]] if "supported_entity" in recognizer_dict else recognizer_dict["supported_entities"] ) entities_to_analyze = list( set(supported_entities).intersection(set(self.analyzed_fields)) ) analyzer_results = self._analyzer.analyze( text, entities=entities_to_analyze, language=language, allow_list=allow_list, ) filtered_analyzer_results = ( self._anonymizer._remove_conflicts_and_get_text_manipulation_data( analyzer_results, conflict_resolution ) ) anonymizer_results = self._anonymizer.anonymize( text, analyzer_results=analyzer_results, operators=self.operators, ) new_deanonymizer_mapping = create_anonymizer_mapping( text, filtered_analyzer_results, anonymizer_results, is_reversed=True, ) self._deanonymizer_mapping.update(new_deanonymizer_mapping) return exact_matching_strategy(text, self.anonymizer_mapping) def _deanonymize( self, text_to_deanonymize: str, deanonymizer_matching_strategy: Callable[ [str, MappingDataType], str ] = DEFAULT_DEANONYMIZER_MATCHING_STRATEGY, ) -> str: """将文本进行去匿名化。 每个匿名化的实体都会被替换为其原始值。 该方法利用了在匿名化过程中创建的映射。 参数: text_to_deanonymize: 需要进行去匿名化的文本 deanonymizer_matching_strategy: 用于匹配匿名化实体与其原始值并替换它们的函数。 """ if not self._deanonymizer_mapping: raise ValueError( "Deanonymizer mapping is empty.", "Please call anonymize() and anonymize some text first.", ) text_to_deanonymize = deanonymizer_matching_strategy( text_to_deanonymize, self.deanonymizer_mapping ) return text_to_deanonymize
[docs] def reset_deanonymizer_mapping(self) -> None: """重置去匿名化映射""" self._deanonymizer_mapping = DeanonymizerMapping()
[docs] def save_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None: """将 deanonymizer 映射保存到 JSON 或 YAML 文件中。 参数: file_path: 要保存映射的文件路径。 示例: .. code-block:: python anonymizer.save_deanonymizer_mapping(file_path="path/mapping.json") """ save_path = Path(file_path) if save_path.suffix not in [".json", ".yaml"]: raise ValueError(f"{save_path} must have an extension of .json or .yaml") # Make sure parent directories exist save_path.parent.mkdir(parents=True, exist_ok=True) if save_path.suffix == ".json": with open(save_path, "w") as f: json.dump(self.deanonymizer_mapping, f, indent=2) elif save_path.suffix.endswith((".yaml", ".yml")): with open(save_path, "w") as f: yaml.dump(self.deanonymizer_mapping, f, default_flow_style=False)
[docs] def load_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None: """从JSON或YAML文件中加载去匿名化映射。 参数: file_path: 要加载映射的文件路径。 示例: .. code-block:: python anonymizer.load_deanonymizer_mapping(file_path="path/mapping.json") """ load_path = Path(file_path) if load_path.suffix not in [".json", ".yaml"]: raise ValueError(f"{load_path} must have an extension of .json or .yaml") if load_path.suffix == ".json": with open(load_path, "r") as f: loaded_mapping = json.load(f) elif load_path.suffix.endswith((".yaml", ".yml")): with open(load_path, "r") as f: loaded_mapping = yaml.load(f, Loader=yaml.FullLoader) self._deanonymizer_mapping.update(loaded_mapping)