ray.data.read_sql#

ray.data.read_sql(sql: str, connection_factory: Callable[[], Any], *, parallelism: int = -1, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None) Dataset[源代码]#

从一个提供 Python DB API2 兼容 连接器的 数据库中读取。

备注

默认情况下,read_sql 启动多个读取任务,每个任务执行一个 LIMITOFFSET 来获取行的子集。然而,对于许多数据库,OFFSET 是缓慢的。

作为一种变通方法,设置 override_num_blocks=1 以直接在一个任务中获取所有行。请注意,这种方法要求所有结果行都能适应单个任务的内存。如果行数过多,程序可能会引发内存不足错误。

示例

关于从MySQL和PostgreSQL等大型数据库读取的示例,请参见 从SQL数据库读取

import sqlite3

import ray

# Create a simple database
connection = sqlite3.connect("example.db")
connection.execute("CREATE TABLE movie(title, year, score)")
connection.execute(
    """
    INSERT INTO movie VALUES
        ('Monty Python and the Holy Grail', 1975, 8.2),
        ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
        ("Monty Python's Life of Brian", 1979, 8.0),
        ("Rocky II", 1979, 7.3)
    """
)
connection.commit()
connection.close()

def create_connection():
    return sqlite3.connect("example.db")

# Get all movies
ds = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
ds = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
ds = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
参数:
  • sql – 要执行的SQL查询。

  • connection_factory – 一个不接受参数并返回 Python DB API2 连接对象 的函数。

  • parallelism – 此参数已弃用。请使用 override_num_blocks 参数。

  • ray_remote_args – 传递给读取任务中 remote() 的 kwargs。

  • concurrency – Ray 任务的最大并发运行数量。设置此项以控制并发运行的任务数量。这不会改变运行的总任务数或输出的总块数。默认情况下,并发性是根据可用资源动态决定的。

  • override_num_blocks – 覆盖所有读取任务的输出块数量。默认情况下,输出块的数量是根据输入数据大小和可用资源动态决定的。在大多数情况下,您不应手动设置此值。

返回:

包含查询数据的 数据集

PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。