ray.data.read_mongo#
- ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: List[Dict] | None = None, schema: pymongoarrow.api.Schema | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, override_num_blocks: int | None = None, **mongo_args) Dataset [源代码]#
从 MongoDB 数据库创建一个
Dataset
。要读取的数据通过 MongoDB 的
uri
、database
和collection
指定。数据集是根据对collection
执行pipeline
的结果创建的。如果pipeline
为 None,则读取整个collection
。小技巧
有关这些 MongoDB 概念的更多详细信息,请参阅以下内容: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - 数据库和集合: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - 管道: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
为了并行读取 MongoDB,管道的执行是在集合的分区上进行的,每个分区由一个 Ray 读取任务处理。分区的创建旨在将文档均匀分布到指定数量的分区中。分区的数量由
parallelism
决定,可以从这个接口请求,如果未指定则自动选择(参见下面的parallelism
参数)。示例
>>> import ray >>> from pymongoarrow.api import Schema >>> ds = ray.data.read_mongo( ... uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501 ... database="my_db", ... collection="my_collection", ... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501 ... schema=Schema({"col1": pa.string(), "col2": pa.int64()}), ... override_num_blocks=10, ... )
- 参数:
uri – 数据集读取的源 MongoDB 的 URI。关于 URI 格式,请参阅 MongoDB 文档 中的详细信息。
database – MongoDB 中托管的数据库名称。如果该数据库不存在,则会引发 ValueError。
collection – 数据库中集合的名称。此集合必须存在,否则会引发 ValueError。
pipeline – 一个在给定集合上执行的 MongoDB 管道,其结果用于创建数据集。如果为 None,则将读取整个集合。
schema – 用于读取集合的模式。如果为 None,则将从管道的结果中推断。
parallelism – 此参数已弃用。请使用
override_num_blocks
参数。ray_remote_args – 传递给读取任务中
remote()
的 kwargs。concurrency – Ray 任务的最大并发运行数量。设置此项以控制并发运行的任务数量。这不会改变运行的总任务数或输出的总块数。默认情况下,并发性是根据可用资源动态决定的。
override_num_blocks – 覆盖所有读取任务的输出块数量。默认情况下,输出块的数量是根据输入数据大小和可用资源动态决定的。在大多数情况下,您不应手动设置此值。
mongo_args – 传递给 aggregate_arrow_all() 的 kwargs,用于在 pymongoarrow 中生成 Arrow 格式的结果。
- 返回:
Dataset
从在指定 MongoDB 集合上执行管道的结果中生成行。- 抛出:
ValueError – 如果
database
不存在。ValueError – 如果
collection
不存在。
PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。