Skip to content

Wordlift

WordLiftLoader #

Bases: BaseReader

一个用于从WordLift GraphQL API获取和转换数据的读取器类。

属性: endpoint(str):API端点URL。 headers(dict):请求头。 query(str):GraphQL查询。 fields(str):要从API响应中提取的字段。 configure_options(dict):其他配置选项。 page(int):页码。 rows(int):每页的行数。

Source code in llama_index/readers/wordlift/base.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class WordLiftLoader(BaseReader):
    """一个用于从WordLift GraphQL API获取和转换数据的读取器类。

Args:
    endpoint(str):API端点URL。
    headers(dict):请求头。
    query(str):GraphQL查询。
    fields(str):要从API响应中提取的字段。
    configure_options(dict):其他配置选项。
    page(int):页码。
    rows(int):每页的行数。

属性:
    endpoint(str):API端点URL。
    headers(dict):请求头。
    query(str):GraphQL查询。
    fields(str):要从API响应中提取的字段。
    configure_options(dict):其他配置选项。
    page(int):页码。
    rows(int):每页的行数。"""

    def __init__(self, endpoint, headers, query, fields, configure_options) -> None:
        self.endpoint = endpoint
        self.headers = headers
        self.query = query
        self.fields = fields
        self.configure_options = configure_options

    def fetch_data(self) -> dict:
        """从WordLift GraphQL API获取数据。

返回:
    dict: API响应数据。

引发:
    APIConnectionError: 如果连接到API时出现错误。
"""
        try:
            query = self.alter_query()
            response = requests.post(
                self.endpoint, json={"query": query}, headers=self.headers
            )
            response.raise_for_status()
            data = response.json()
            if ERRORS_KEY in data:
                raise APICallError(data[ERRORS_KEY])
            return data
        except requests.exceptions.RequestException as e:
            logging.error("Error connecting to the API:", exc_info=True)
            raise APICallError("Error connecting to the API") from e

    def transform_data(self, data: dict) -> List[Document]:
        """将获取的数据转换为文档对象列表。

Args:
    data(字典):API响应数据。

Returns:
    List[Document]:转换后的文档列表。

引发:
    DataTransformError:如果在转换数据时出现错误。
"""
        try:
            data = data[DATA_KEY][self.fields]
            documents = []
            text_fields = self.configure_options.get("text_fields", [])
            metadata_fields = self.configure_options.get("metadata_fields", [])

            for item in data:
                if not all(key in item for key in text_fields):
                    logging.warning(
                        f"Skipping document due to missing text fields: {item}"
                    )
                    continue
                row = {}
                for key, value in item.items():
                    if key in text_fields or key in metadata_fields:
                        row[key] = value
                    else:
                        row[key] = clean_value(value)

                text_parts = [
                    get_separated_value(row, field.split("."))
                    for field in text_fields
                    if get_separated_value(row, field.split(".")) is not None
                ]

                text_parts = flatten_list(text_parts)
                text = " ".join(text_parts)

                extra_info = {}
                for field in metadata_fields:
                    field_keys = field.split(".")
                    value = get_separated_value(row, field_keys)
                    if value is None:
                        logging.warning(f"Using default value for {field}")
                        value = "n.a"
                    if isinstance(value, list) and len(value) != 0:
                        value = value[0]
                    if is_url(value) and is_valid_html(value):
                        value = value.replace("\n", "")
                        extra_info[field] = value
                    else:
                        cleaned_value = clean_value(value)
                        cleaned_value = cleaned_value.replace("\n", "")
                        extra_info[field] = cleaned_value
                text = text.replace("\n", "")
                plain_text = re.sub("<.*?>", "", text)
                document = Document(text=plain_text, extra_info=extra_info)
                documents.append(document)

            return documents
        except Exception as e:
            logging.error("Error transforming data:", exc_info=True)
            raise DataTransformError("Error transforming data") from e

    def load_data(self) -> List[Document]:
        """加载数据,通过获取和转换数据。

返回:
    List[Document]: 加载的文档列表。
"""
        try:
            data = self.fetch_data()
            return self.transform_data(data)
        except (APICallError, DataTransformError):
            logging.error("Error loading data:", exc_info=True)
            raise

    def alter_query(self):
        """修改GraphQL查询,添加分页参数。

返回:
    str:带有分页参数的修改后的GraphQL查询。
"""
        from graphql import parse, print_ast
        from graphql.language.ast import ArgumentNode, IntValueNode, NameNode

        DEFAULT_PAGE = 0
        DEFAULT_ROWS = 500

        query = self.query
        page = DEFAULT_PAGE
        rows = DEFAULT_ROWS

        ast = parse(query)

        field_node = ast.definitions[0].selection_set.selections[0]

        if not any(arg.name.value == "page" for arg in field_node.arguments):
            page_argument = ArgumentNode(
                name=NameNode(value="page"), value=IntValueNode(value=page)
            )
            rows_argument = ArgumentNode(
                name=NameNode(value="rows"), value=IntValueNode(value=rows)
            )
            field_node.arguments = (*field_node.arguments, page_argument, rows_argument)
        return print_ast(ast)

fetch_data #

fetch_data() -> dict

从WordLift GraphQL API获取数据。

返回: dict: API响应数据。

引发: APIConnectionError: 如果连接到API时出现错误。

Source code in llama_index/readers/wordlift/base.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    def fetch_data(self) -> dict:
        """从WordLift GraphQL API获取数据。

返回:
    dict: API响应数据。

引发:
    APIConnectionError: 如果连接到API时出现错误。
"""
        try:
            query = self.alter_query()
            response = requests.post(
                self.endpoint, json={"query": query}, headers=self.headers
            )
            response.raise_for_status()
            data = response.json()
            if ERRORS_KEY in data:
                raise APICallError(data[ERRORS_KEY])
            return data
        except requests.exceptions.RequestException as e:
            logging.error("Error connecting to the API:", exc_info=True)
            raise APICallError("Error connecting to the API") from e

transform_data #

transform_data(data: dict) -> List[Document]

将获取的数据转换为文档对象列表。

Returns:

Type Description
List[Document]

List[Document]:转换后的文档列表。

引发: DataTransformError:如果在转换数据时出现错误。

Source code in llama_index/readers/wordlift/base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
    def transform_data(self, data: dict) -> List[Document]:
        """将获取的数据转换为文档对象列表。

Args:
    data(字典):API响应数据。

Returns:
    List[Document]:转换后的文档列表。

引发:
    DataTransformError:如果在转换数据时出现错误。
"""
        try:
            data = data[DATA_KEY][self.fields]
            documents = []
            text_fields = self.configure_options.get("text_fields", [])
            metadata_fields = self.configure_options.get("metadata_fields", [])

            for item in data:
                if not all(key in item for key in text_fields):
                    logging.warning(
                        f"Skipping document due to missing text fields: {item}"
                    )
                    continue
                row = {}
                for key, value in item.items():
                    if key in text_fields or key in metadata_fields:
                        row[key] = value
                    else:
                        row[key] = clean_value(value)

                text_parts = [
                    get_separated_value(row, field.split("."))
                    for field in text_fields
                    if get_separated_value(row, field.split(".")) is not None
                ]

                text_parts = flatten_list(text_parts)
                text = " ".join(text_parts)

                extra_info = {}
                for field in metadata_fields:
                    field_keys = field.split(".")
                    value = get_separated_value(row, field_keys)
                    if value is None:
                        logging.warning(f"Using default value for {field}")
                        value = "n.a"
                    if isinstance(value, list) and len(value) != 0:
                        value = value[0]
                    if is_url(value) and is_valid_html(value):
                        value = value.replace("\n", "")
                        extra_info[field] = value
                    else:
                        cleaned_value = clean_value(value)
                        cleaned_value = cleaned_value.replace("\n", "")
                        extra_info[field] = cleaned_value
                text = text.replace("\n", "")
                plain_text = re.sub("<.*?>", "", text)
                document = Document(text=plain_text, extra_info=extra_info)
                documents.append(document)

            return documents
        except Exception as e:
            logging.error("Error transforming data:", exc_info=True)
            raise DataTransformError("Error transforming data") from e

load_data #

load_data() -> List[Document]

加载数据,通过获取和转换数据。

返回: List[Document]: 加载的文档列表。

Source code in llama_index/readers/wordlift/base.py
154
155
156
157
158
159
160
161
162
163
164
165
    def load_data(self) -> List[Document]:
        """加载数据,通过获取和转换数据。

返回:
    List[Document]: 加载的文档列表。
"""
        try:
            data = self.fetch_data()
            return self.transform_data(data)
        except (APICallError, DataTransformError):
            logging.error("Error loading data:", exc_info=True)
            raise

alter_query #

alter_query()

修改GraphQL查询,添加分页参数。

返回: str:带有分页参数的修改后的GraphQL查询。

Source code in llama_index/readers/wordlift/base.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
    def alter_query(self):
        """修改GraphQL查询,添加分页参数。

返回:
    str:带有分页参数的修改后的GraphQL查询。
"""
        from graphql import parse, print_ast
        from graphql.language.ast import ArgumentNode, IntValueNode, NameNode

        DEFAULT_PAGE = 0
        DEFAULT_ROWS = 500

        query = self.query
        page = DEFAULT_PAGE
        rows = DEFAULT_ROWS

        ast = parse(query)

        field_node = ast.definitions[0].selection_set.selections[0]

        if not any(arg.name.value == "page" for arg in field_node.arguments):
            page_argument = ArgumentNode(
                name=NameNode(value="page"), value=IntValueNode(value=page)
            )
            rows_argument = ArgumentNode(
                name=NameNode(value="rows"), value=IntValueNode(value=rows)
            )
            field_node.arguments = (*field_node.arguments, page_argument, rows_argument)
        return print_ast(ast)