ray.data.read_mongo#

ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: List[Dict] | None = None, schema: pymongoarrow.api.Schema | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, override_num_blocks: int | None = None, **mongo_args) Dataset[源代码]#

从 MongoDB 数据库创建一个 Dataset

要读取的数据通过 MongoDB 的 uridatabasecollection 指定。数据集是根据对 collection 执行 pipeline 的结果创建的。如果 pipeline 为 None,则读取整个 collection

小技巧

有关这些 MongoDB 概念的更多详细信息,请参阅以下内容: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - 数据库和集合: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - 管道: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/

为了并行读取 MongoDB,管道的执行是在集合的分区上进行的,每个分区由一个 Ray 读取任务处理。分区的创建旨在将文档均匀分布到指定数量的分区中。分区的数量由 parallelism 决定,可以从这个接口请求,如果未指定则自动选择(参见下面的 parallelism 参数)。

示例

>>> import ray
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_mongo( 
...     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501
...     database="my_db",
...     collection="my_collection",
...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
...     override_num_blocks=10,
... )
参数:
  • uri – 数据集读取的源 MongoDB 的 URI。关于 URI 格式,请参阅 MongoDB 文档 中的详细信息。

  • database – MongoDB 中托管的数据库名称。如果该数据库不存在,则会引发 ValueError。

  • collection – 数据库中集合的名称。此集合必须存在,否则会引发 ValueError。

  • pipeline – 一个在给定集合上执行的 MongoDB 管道,其结果用于创建数据集。如果为 None,则将读取整个集合。

  • schema – 用于读取集合的模式。如果为 None,则将从管道的结果中推断。

  • parallelism – 此参数已弃用。请使用 override_num_blocks 参数。

  • ray_remote_args – 传递给读取任务中 remote() 的 kwargs。

  • concurrency – Ray 任务的最大并发运行数量。设置此项以控制并发运行的任务数量。这不会改变运行的总任务数或输出的总块数。默认情况下,并发性是根据可用资源动态决定的。

  • override_num_blocks – 覆盖所有读取任务的输出块数量。默认情况下,输出块的数量是根据输入数据大小和可用资源动态决定的。在大多数情况下,您不应手动设置此值。

  • mongo_args – 传递给 aggregate_arrow_all() 的 kwargs,用于在 pymongoarrow 中生成 Arrow 格式的结果。

返回:

Dataset 从在指定 MongoDB 集合上执行管道的结果中生成行。

抛出:

PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。