⌘+k ctrl+k
1.1.3 (stable)
Search Shortcut cmd + k | ctrl + k
ADBC API

Arrow 数据库连接 (ADBC),类似于 ODBC 和 JDBC,是一种 C 风格的 API,可以在不同的数据库系统之间实现代码的可移植性。这使得开发者能够轻松构建与数据库系统通信的应用程序,而无需使用特定于该系统的代码。ADBC 与 ODBC/JDBC 的主要区别在于,ADBC 使用 Arrow 在数据库系统和应用程序之间传输数据。DuckDB 有一个 ADBC 驱动程序,它利用了 DuckDB 和 Arrow 之间的零拷贝集成 来高效地传输数据。

DuckDB的ADBC驱动程序目前支持ADBC的0.7版本。

请参考ADBC文档页面以获取关于ADBC的更广泛讨论和详细的API解释。

已实现的功能

DuckDB-ADBC 驱动程序实现了完整的 ADBC 规范,除了 ConnectionReadPartitionStatementExecutePartitions 函数。这两个函数的存在是为了支持内部对查询结果进行分区的系统,这不适用于 DuckDB。 在本节中,我们将描述 ADBC 中存在的主要函数,以及它们所接受的参数,并为每个函数提供示例。

数据库

一组在数据库上操作的函数。

函数名称 描述 参数 示例
DatabaseNew 分配一个新的(但未初始化的)数据库。 (AdbcDatabase *database, AdbcError *error) AdbcDatabaseNew(&adbc_database, &adbc_error)
DatabaseSetOption 设置一个char*选项。 (AdbcDatabase *database, const char *key, const char *value, AdbcError *error) AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error)
DatabaseInit 完成设置选项并初始化数据库。 (AdbcDatabase *database, AdbcError *error) AdbcDatabaseInit(&adbc_database, &adbc_error)
DatabaseRelease 销毁数据库。 (AdbcDatabase *database, AdbcError *error) AdbcDatabaseRelease(&adbc_database, &adbc_error)

Connection

一组用于创建和销毁与数据库交互的连接的函数。

函数名称 描述 参数 示例
ConnectionNew 分配一个新的(但未初始化的)连接。 (AdbcConnection*, AdbcError*) AdbcConnectionNew(&adbc_connection, &adbc_error)
ConnectionSetOption 选项可以在ConnectionInit之前设置。 (AdbcConnection*, const char*, const char*, AdbcError*) AdbcConnectionSetOption(&adbc_connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &adbc_error)
ConnectionInit 完成选项设置并初始化连接。 (AdbcConnection*, AdbcDatabase*, AdbcError*) AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error)
ConnectionRelease 销毁此连接。 (AdbcConnection*, AdbcError*) AdbcConnectionRelease(&adbc_connection, &adbc_error)

一组用于检索数据库元数据的函数。通常,这些函数将返回Arrow对象,特别是ArrowArrayStream。

Function name Description Arguments Example
ConnectionGetObjects 获取所有目录、数据库模式、表和列的层次视图。 (AdbcConnection*, int, const char*, const char*, const char*, const char**, const char*, ArrowArrayStream*, AdbcError*) AdbcDatabaseInit(&adbc_database, &adbc_error)
ConnectionGetTableSchema 获取表的Arrow模式。 (AdbcConnection*, const char*, const char*, const char*, ArrowSchema*, AdbcError*) AdbcDatabaseRelease(&adbc_database, &adbc_error)
ConnectionGetTableTypes 获取数据库中的表类型列表。 (AdbcConnection*, ArrowArrayStream*, AdbcError*) AdbcDatabaseNew(&adbc_database, &adbc_error)

一组具有事务语义的函数,用于连接。默认情况下,所有连接都以自动提交模式启动,但可以通过ConnectionSetOption函数关闭此模式。

Function name Description Arguments Example
ConnectionCommit 提交任何挂起的事务。 (AdbcConnection*, AdbcError*) AdbcConnectionCommit(&adbc_connection, &adbc_error)
ConnectionRollback 回滚任何挂起的事务。 (AdbcConnection*, AdbcError*) AdbcConnectionRollback(&adbc_connection, &adbc_error)

Statement

语句保存与查询执行相关的状态。它们代表一次性查询和预编译语句。它们可以重复使用;然而,这样做将使该语句之前的任何结果集失效。

用于创建、销毁和设置语句选项的函数:

Function name Description Arguments Example
StatementNew 为给定的连接创建一个新的语句。 (AdbcConnection*, AdbcStatement*, AdbcError*) AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error)
StatementRelease 销毁一个语句。 (AdbcStatement*, AdbcError*) AdbcStatementRelease(&adbc_statement, &adbc_error)
StatementSetOption 在语句上设置一个字符串选项。 (AdbcStatement*, const char*, const char*, AdbcError*) StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "TABLE_NAME", &adbc_error)

与查询执行相关的函数:

Function name Description Arguments Example
StatementSetSqlQuery 设置要执行的SQL查询。然后可以使用StatementExecuteQuery执行查询。 (AdbcStatement*, const char*, AdbcError*) AdbcStatementSetSqlQuery(&adbc_statement, "SELECT * FROM TABLE", &adbc_error)
StatementSetSubstraitPlan 设置一个substrait计划来执行。然后可以使用StatementExecuteQuery执行查询。 (AdbcStatement*, const uint8_t*, size_t, AdbcError*) AdbcStatementSetSubstraitPlan(&adbc_statement, substrait_plan, length, &adbc_error)
StatementExecuteQuery 执行一个语句并获取结果。 (AdbcStatement*, ArrowArrayStream*, int64_t*, AdbcError*) AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error)
StatementPrepare 将此语句转换为准备语句,以便多次执行。 (AdbcStatement*, AdbcError*) AdbcStatementPrepare(&adbc_statement, &adbc_error)

与绑定相关的函数,用于批量插入或在预处理语句中使用。

Function name Description Arguments Example
StatementBindStream 绑定Arrow流。这可以用于批量插入或预编译语句。 (AdbcStatement*, ArrowArrayStream*, AdbcError*) StatementBindStream(&adbc_statement, &input_data, &adbc_error)

Examples

无论使用哪种编程语言,使用ADBC与DuckDB时都需要两个数据库选项。第一个是driver,它需要DuckDB库的路径。第二个选项是entrypoint,它是从DuckDB-ADBC驱动程序中导出的一个函数,用于初始化所有ADBC函数。配置完这两个选项后,我们可以选择性地设置path选项,提供一个磁盘路径来存储我们的DuckDB数据库。如果未设置,则会创建一个内存数据库。配置完所有必要的选项后,我们可以继续初始化我们的数据库。以下是您可以在各种不同语言环境中执行此操作的方法。

C++

我们通过声明通过ADBC查询数据所需的基本变量来开始我们的C++示例。这些变量包括Error、Database、Connection、Statement处理,以及一个用于在DuckDB和应用程序之间传输数据的Arrow Stream。

AdbcError adbc_error;
AdbcDatabase adbc_database;
AdbcConnection adbc_connection;
AdbcStatement adbc_statement;
ArrowArrayStream arrow_stream;

然后我们可以初始化我们的数据库变量。在初始化数据库之前,我们需要设置上面提到的driverentrypoint选项。然后我们设置path选项并初始化数据库。在下面的示例中,字符串"path/to/libduckdb.dylib"应该是DuckDB动态库的路径。在macOS上这将是.dylib,在Linux上将是.so

AdbcDatabaseNew(&adbc_database, &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "driver", "path/to/libduckdb.dylib", &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "entrypoint", "duckdb_adbc_init", &adbc_error);
// By default, we start an in-memory database, but you can optionally define a path to store it on disk.
AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error);
AdbcDatabaseInit(&adbc_database, &adbc_error);

初始化数据库后,我们必须创建并初始化一个连接。

AdbcConnectionNew(&adbc_connection, &adbc_error);
AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error);

我们现在可以初始化我们的语句并通过我们的连接运行查询。在AdbcStatementExecuteQuery之后,arrow_stream会被填充结果。

AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error);
AdbcStatementSetSqlQuery(&adbc_statement, "SELECT 42", &adbc_error);
int64_t rows_affected;
AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error);
arrow_stream.release(arrow_stream)

除了运行查询,我们还可以通过arrow_streams来摄取数据。为此,我们需要设置一个选项,指定我们要插入的表名,绑定流,然后执行查询。

StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "AnswerToEverything", &adbc_error);
StatementBindStream(&adbc_statement, &arrow_stream, &adbc_error);
StatementExecuteQuery(&adbc_statement, nullptr, nullptr, &adbc_error);

Python

首先要做的是使用pip并安装ADBC驱动程序管理器。您还需要安装pyarrow以直接访问Apache Arrow格式的结果集(例如使用fetch_arrow_table)。

pip install adbc_driver_manager pyarrow

有关adbc_driver_manager包的详细信息,请参阅adbc_driver_manager包文档

与C++类似,我们需要提供初始化选项,包括libduckdb共享对象的位置和入口函数。请注意,DuckDB的path参数是通过db_kwargs字典传递的。

import adbc_driver_duckdb.dbapi

with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
    cur.execute("SELECT 42")
    # fetch a pyarrow table
    tbl = cur.fetch_arrow_table()
    print(tbl)

除了fetch_arrow_table,DBApi的其他方法也在游标上实现,例如fetchonefetchall。数据也可以通过arrow_streams进行摄取。我们只需要在语句上设置选项来绑定数据流并执行查询。

import adbc_driver_duckdb.dbapi
import pyarrow

data = pyarrow.record_batch(
    [[1, 2, 3, 4], ["a", "b", "c", "d"]],
    names = ["ints", "strs"],
)

with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
    cur.adbc_ingest("AnswerToEverything", data)

Go

请确保首先从发布页面下载libduckdb库(即在Linux上的.so,Mac上的.dylib或Windows上的.dll),并在运行代码之前将其放在LD_LIBRARY_PATH上(但如果不这样做,错误信息会解释有关此文件位置的选项。)

以下示例使用内存中的DuckDB数据库通过SQL查询修改内存中的Arrow RecordBatches:

package main

import (
    "bytes"
    "context"
    "fmt"
    "io"

    "github.com/apache/arrow-adbc/go/adbc"
    "github.com/apache/arrow-adbc/go/adbc/drivermgr"
    "github.com/apache/arrow/go/v17/arrow"
    "github.com/apache/arrow/go/v17/arrow/array"
    "github.com/apache/arrow/go/v17/arrow/ipc"
    "github.com/apache/arrow/go/v17/arrow/memory"
)

func _makeSampleArrowRecord() arrow.Record {
    b := array.NewFloat64Builder(memory.DefaultAllocator)
    b.AppendValues([]float64{1, 2, 3}, nil)
    col := b.NewArray()

    defer col.Release()
    defer b.Release()

    schema := arrow.NewSchema([]arrow.Field{{Name: "column1", Type: arrow.PrimitiveTypes.Float64}}, nil)
    return array.NewRecord(schema, []arrow.Array{col}, int64(col.Len()))
}

type DuckDBSQLRunner struct {
    ctx  context.Context
    conn adbc.Connection
    db   adbc.Database
}

func NewDuckDBSQLRunner(ctx context.Context) (*DuckDBSQLRunner, error) {
    var drv drivermgr.Driver
    db, err := drv.NewDatabase(map[string]string{
        "driver":     "duckdb",
        "entrypoint": "duckdb_adbc_init",
        "path":       ":memory:",
    })
    if err != nil {
        return nil, fmt.Errorf("failed to create new in-memory DuckDB database: %w", err)
    }
    conn, err := db.Open(ctx)
    if err != nil {
        return nil, fmt.Errorf("failed to open connection to new in-memory DuckDB database: %w", err)
    }
    return &DuckDBSQLRunner{ctx: ctx, conn: conn, db: db}, nil
}

func serializeRecord(record arrow.Record) (io.Reader, error) {
    buf := new(bytes.Buffer)
    wr := ipc.NewWriter(buf, ipc.WithSchema(record.Schema()))
    if err := wr.Write(record); err != nil {
        return nil, fmt.Errorf("failed to write record: %w", err)
    }
    if err := wr.Close(); err != nil {
        return nil, fmt.Errorf("failed to close writer: %w", err)
    }
    return buf, nil
}

func (r *DuckDBSQLRunner) importRecord(sr io.Reader) error {
    rdr, err := ipc.NewReader(sr)
    if err != nil {
        return fmt.Errorf("failed to create IPC reader: %w", err)
    }
    defer rdr.Release()
    stmt, err := r.conn.NewStatement()
    if err != nil {
        return fmt.Errorf("failed to create new statement: %w", err)
    }
    if err := stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate); err != nil {
        return fmt.Errorf("failed to set ingest mode: %w", err)
    }
    if err := stmt.SetOption(adbc.OptionKeyIngestTargetTable, "temp_table"); err != nil {
        return fmt.Errorf("failed to set ingest target table: %w", err)
    }
    if err := stmt.BindStream(r.ctx, rdr); err != nil {
        return fmt.Errorf("failed to bind stream: %w", err)
    }
    if _, err := stmt.ExecuteUpdate(r.ctx); err != nil {
        return fmt.Errorf("failed to execute update: %w", err)
    }
    return stmt.Close()
}

func (r *DuckDBSQLRunner) runSQL(sql string) ([]arrow.Record, error) {
    stmt, err := r.conn.NewStatement()
    if err != nil {
        return nil, fmt.Errorf("failed to create new statement: %w", err)
    }
    defer stmt.Close()

    if err := stmt.SetSqlQuery(sql); err != nil {
        return nil, fmt.Errorf("failed to set SQL query: %w", err)
    }
    out, n, err := stmt.ExecuteQuery(r.ctx)
    if err != nil {
        return nil, fmt.Errorf("failed to execute query: %w", err)
    }
    defer out.Release()

    result := make([]arrow.Record, 0, n)
    for out.Next() {
        rec := out.Record()
        rec.Retain() // .Next() will release the record, so we need to retain it
        result = append(result, rec)
    }
    if out.Err() != nil {
        return nil, out.Err()
    }
    return result, nil
}

func (r *DuckDBSQLRunner) RunSQLOnRecord(record arrow.Record, sql string) ([]arrow.Record, error) {
    serializedRecord, err := serializeRecord(record)
    if err != nil {
        return nil, fmt.Errorf("failed to serialize record: %w", err)
    }
    if err := r.importRecord(serializedRecord); err != nil {
        return nil, fmt.Errorf("failed to import record: %w", err)
    }
    result, err := r.runSQL(sql)
    if err != nil {
        return nil, fmt.Errorf("failed to run SQL: %w", err)
    }

    if _, err := r.runSQL("DROP TABLE temp_table"); err != nil {
        return nil, fmt.Errorf("failed to drop temp table after running query: %w", err)
    }
    return result, nil
}

func (r *DuckDBSQLRunner) Close() {
    r.conn.Close()
    r.db.Close()
}

func main() {
    rec := _makeSampleArrowRecord()
    fmt.Println(rec)

    runner, err := NewDuckDBSQLRunner(context.Background())
    if err != nil {
        panic(err)
    }
    defer runner.Close()

    resultRecords, err := runner.RunSQLOnRecord(rec, "SELECT column1+1 FROM temp_table")
    if err != nil {
        panic(err)
    }

    for _, resultRecord := range resultRecords {
        fmt.Println(resultRecord)
        resultRecord.Release()
    }
}

运行它会产生以下输出:

record:
  schema:
  fields: 1
    - column1: type=float64
  rows: 3
  col[0][column1]: [1 2 3]

record:
  schema:
  fields: 1
    - (column1 + 1): type=float64, nullable
  rows: 3
  col[0][(column1 + 1)]: [2 3 4]