Arrow 数据库连接 (ADBC),类似于 ODBC 和 JDBC,是一种 C 风格的 API,可以在不同的数据库系统之间实现代码的可移植性。这使得开发者能够轻松构建与数据库系统通信的应用程序,而无需使用特定于该系统的代码。ADBC 与 ODBC/JDBC 的主要区别在于,ADBC 使用 Arrow 在数据库系统和应用程序之间传输数据。DuckDB 有一个 ADBC 驱动程序,它利用了 DuckDB 和 Arrow 之间的零拷贝集成 来高效地传输数据。
DuckDB的ADBC驱动程序目前支持ADBC的0.7版本。
请参考ADBC文档页面以获取关于ADBC的更广泛讨论和详细的API解释。
已实现的功能
DuckDB-ADBC 驱动程序实现了完整的 ADBC 规范,除了 ConnectionReadPartition
和 StatementExecutePartitions
函数。这两个函数的存在是为了支持内部对查询结果进行分区的系统,这不适用于 DuckDB。
在本节中,我们将描述 ADBC 中存在的主要函数,以及它们所接受的参数,并为每个函数提供示例。
数据库
一组在数据库上操作的函数。
函数名称 | 描述 | 参数 | 示例 |
---|---|---|---|
DatabaseNew |
分配一个新的(但未初始化的)数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseNew(&adbc_database, &adbc_error) |
DatabaseSetOption |
设置一个char*选项。 | (AdbcDatabase *database, const char *key, const char *value, AdbcError *error) |
AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error) |
DatabaseInit |
完成设置选项并初始化数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseInit(&adbc_database, &adbc_error) |
DatabaseRelease |
销毁数据库。 | (AdbcDatabase *database, AdbcError *error) |
AdbcDatabaseRelease(&adbc_database, &adbc_error) |
Connection
一组用于创建和销毁与数据库交互的连接的函数。
函数名称 | 描述 | 参数 | 示例 |
---|---|---|---|
ConnectionNew |
分配一个新的(但未初始化的)连接。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionNew(&adbc_connection, &adbc_error) |
ConnectionSetOption |
选项可以在ConnectionInit之前设置。 | (AdbcConnection*, const char*, const char*, AdbcError*) |
AdbcConnectionSetOption(&adbc_connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &adbc_error) |
ConnectionInit |
完成选项设置并初始化连接。 | (AdbcConnection*, AdbcDatabase*, AdbcError*) |
AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error) |
ConnectionRelease |
销毁此连接。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionRelease(&adbc_connection, &adbc_error) |
一组用于检索数据库元数据的函数。通常,这些函数将返回Arrow对象,特别是ArrowArrayStream。
Function name | Description | Arguments | Example |
---|---|---|---|
ConnectionGetObjects |
获取所有目录、数据库模式、表和列的层次视图。 | (AdbcConnection*, int, const char*, const char*, const char*, const char**, const char*, ArrowArrayStream*, AdbcError*) |
AdbcDatabaseInit(&adbc_database, &adbc_error) |
ConnectionGetTableSchema |
获取表的Arrow模式。 | (AdbcConnection*, const char*, const char*, const char*, ArrowSchema*, AdbcError*) |
AdbcDatabaseRelease(&adbc_database, &adbc_error) |
ConnectionGetTableTypes |
获取数据库中的表类型列表。 | (AdbcConnection*, ArrowArrayStream*, AdbcError*) |
AdbcDatabaseNew(&adbc_database, &adbc_error) |
一组具有事务语义的函数,用于连接。默认情况下,所有连接都以自动提交模式启动,但可以通过ConnectionSetOption函数关闭此模式。
Function name | Description | Arguments | Example |
---|---|---|---|
ConnectionCommit |
提交任何挂起的事务。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionCommit(&adbc_connection, &adbc_error) |
ConnectionRollback |
回滚任何挂起的事务。 | (AdbcConnection*, AdbcError*) |
AdbcConnectionRollback(&adbc_connection, &adbc_error) |
Statement
语句保存与查询执行相关的状态。它们代表一次性查询和预编译语句。它们可以重复使用;然而,这样做将使该语句之前的任何结果集失效。
用于创建、销毁和设置语句选项的函数:
Function name | Description | Arguments | Example |
---|---|---|---|
StatementNew |
为给定的连接创建一个新的语句。 | (AdbcConnection*, AdbcStatement*, AdbcError*) |
AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error) |
StatementRelease |
销毁一个语句。 | (AdbcStatement*, AdbcError*) |
AdbcStatementRelease(&adbc_statement, &adbc_error) |
StatementSetOption |
在语句上设置一个字符串选项。 | (AdbcStatement*, const char*, const char*, AdbcError*) |
StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "TABLE_NAME", &adbc_error) |
与查询执行相关的函数:
Function name | Description | Arguments | Example |
---|---|---|---|
StatementSetSqlQuery |
设置要执行的SQL查询。然后可以使用StatementExecuteQuery执行查询。 | (AdbcStatement*, const char*, AdbcError*) |
AdbcStatementSetSqlQuery(&adbc_statement, "SELECT * FROM TABLE", &adbc_error) |
StatementSetSubstraitPlan |
设置一个substrait计划来执行。然后可以使用StatementExecuteQuery执行查询。 | (AdbcStatement*, const uint8_t*, size_t, AdbcError*) |
AdbcStatementSetSubstraitPlan(&adbc_statement, substrait_plan, length, &adbc_error) |
StatementExecuteQuery |
执行一个语句并获取结果。 | (AdbcStatement*, ArrowArrayStream*, int64_t*, AdbcError*) |
AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error) |
StatementPrepare |
将此语句转换为准备语句,以便多次执行。 | (AdbcStatement*, AdbcError*) |
AdbcStatementPrepare(&adbc_statement, &adbc_error) |
与绑定相关的函数,用于批量插入或在预处理语句中使用。
Function name | Description | Arguments | Example |
---|---|---|---|
StatementBindStream |
绑定Arrow流。这可以用于批量插入或预编译语句。 | (AdbcStatement*, ArrowArrayStream*, AdbcError*) |
StatementBindStream(&adbc_statement, &input_data, &adbc_error) |
Examples
无论使用哪种编程语言,使用ADBC与DuckDB时都需要两个数据库选项。第一个是driver
,它需要DuckDB库的路径。第二个选项是entrypoint
,它是从DuckDB-ADBC驱动程序中导出的一个函数,用于初始化所有ADBC函数。配置完这两个选项后,我们可以选择性地设置path
选项,提供一个磁盘路径来存储我们的DuckDB数据库。如果未设置,则会创建一个内存数据库。配置完所有必要的选项后,我们可以继续初始化我们的数据库。以下是您可以在各种不同语言环境中执行此操作的方法。
C++
我们通过声明通过ADBC查询数据所需的基本变量来开始我们的C++示例。这些变量包括Error、Database、Connection、Statement处理,以及一个用于在DuckDB和应用程序之间传输数据的Arrow Stream。
AdbcError adbc_error;
AdbcDatabase adbc_database;
AdbcConnection adbc_connection;
AdbcStatement adbc_statement;
ArrowArrayStream arrow_stream;
然后我们可以初始化我们的数据库变量。在初始化数据库之前,我们需要设置上面提到的driver
和entrypoint
选项。然后我们设置path
选项并初始化数据库。在下面的示例中,字符串"path/to/libduckdb.dylib"
应该是DuckDB动态库的路径。在macOS上这将是.dylib
,在Linux上将是.so
。
AdbcDatabaseNew(&adbc_database, &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "driver", "path/to/libduckdb.dylib", &adbc_error);
AdbcDatabaseSetOption(&adbc_database, "entrypoint", "duckdb_adbc_init", &adbc_error);
// By default, we start an in-memory database, but you can optionally define a path to store it on disk.
AdbcDatabaseSetOption(&adbc_database, "path", "test.db", &adbc_error);
AdbcDatabaseInit(&adbc_database, &adbc_error);
初始化数据库后,我们必须创建并初始化一个连接。
AdbcConnectionNew(&adbc_connection, &adbc_error);
AdbcConnectionInit(&adbc_connection, &adbc_database, &adbc_error);
我们现在可以初始化我们的语句并通过我们的连接运行查询。在AdbcStatementExecuteQuery
之后,arrow_stream
会被填充结果。
AdbcStatementNew(&adbc_connection, &adbc_statement, &adbc_error);
AdbcStatementSetSqlQuery(&adbc_statement, "SELECT 42", &adbc_error);
int64_t rows_affected;
AdbcStatementExecuteQuery(&adbc_statement, &arrow_stream, &rows_affected, &adbc_error);
arrow_stream.release(arrow_stream)
除了运行查询,我们还可以通过arrow_streams
来摄取数据。为此,我们需要设置一个选项,指定我们要插入的表名,绑定流,然后执行查询。
StatementSetOption(&adbc_statement, ADBC_INGEST_OPTION_TARGET_TABLE, "AnswerToEverything", &adbc_error);
StatementBindStream(&adbc_statement, &arrow_stream, &adbc_error);
StatementExecuteQuery(&adbc_statement, nullptr, nullptr, &adbc_error);
Python
首先要做的是使用pip
并安装ADBC驱动程序管理器。您还需要安装pyarrow
以直接访问Apache Arrow格式的结果集(例如使用fetch_arrow_table
)。
pip install adbc_driver_manager pyarrow
有关
adbc_driver_manager
包的详细信息,请参阅adbc_driver_manager
包文档。
与C++类似,我们需要提供初始化选项,包括libduckdb共享对象的位置和入口函数。请注意,DuckDB的path
参数是通过db_kwargs
字典传递的。
import adbc_driver_duckdb.dbapi
with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
cur.execute("SELECT 42")
# fetch a pyarrow table
tbl = cur.fetch_arrow_table()
print(tbl)
除了fetch_arrow_table
,DBApi的其他方法也在游标上实现,例如fetchone
和fetchall
。数据也可以通过arrow_streams
进行摄取。我们只需要在语句上设置选项来绑定数据流并执行查询。
import adbc_driver_duckdb.dbapi
import pyarrow
data = pyarrow.record_batch(
[[1, 2, 3, 4], ["a", "b", "c", "d"]],
names = ["ints", "strs"],
)
with adbc_driver_duckdb.dbapi.connect("test.db") as conn, conn.cursor() as cur:
cur.adbc_ingest("AnswerToEverything", data)
Go
请确保首先从发布页面下载libduckdb
库(即在Linux上的.so
,Mac上的.dylib
或Windows上的.dll
),并在运行代码之前将其放在LD_LIBRARY_PATH
上(但如果不这样做,错误信息会解释有关此文件位置的选项。)
以下示例使用内存中的DuckDB数据库通过SQL查询修改内存中的Arrow RecordBatches:
package main
import (
"bytes"
"context"
"fmt"
"io"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/drivermgr"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/ipc"
"github.com/apache/arrow/go/v17/arrow/memory"
)
func _makeSampleArrowRecord() arrow.Record {
b := array.NewFloat64Builder(memory.DefaultAllocator)
b.AppendValues([]float64{1, 2, 3}, nil)
col := b.NewArray()
defer col.Release()
defer b.Release()
schema := arrow.NewSchema([]arrow.Field{{Name: "column1", Type: arrow.PrimitiveTypes.Float64}}, nil)
return array.NewRecord(schema, []arrow.Array{col}, int64(col.Len()))
}
type DuckDBSQLRunner struct {
ctx context.Context
conn adbc.Connection
db adbc.Database
}
func NewDuckDBSQLRunner(ctx context.Context) (*DuckDBSQLRunner, error) {
var drv drivermgr.Driver
db, err := drv.NewDatabase(map[string]string{
"driver": "duckdb",
"entrypoint": "duckdb_adbc_init",
"path": ":memory:",
})
if err != nil {
return nil, fmt.Errorf("failed to create new in-memory DuckDB database: %w", err)
}
conn, err := db.Open(ctx)
if err != nil {
return nil, fmt.Errorf("failed to open connection to new in-memory DuckDB database: %w", err)
}
return &DuckDBSQLRunner{ctx: ctx, conn: conn, db: db}, nil
}
func serializeRecord(record arrow.Record) (io.Reader, error) {
buf := new(bytes.Buffer)
wr := ipc.NewWriter(buf, ipc.WithSchema(record.Schema()))
if err := wr.Write(record); err != nil {
return nil, fmt.Errorf("failed to write record: %w", err)
}
if err := wr.Close(); err != nil {
return nil, fmt.Errorf("failed to close writer: %w", err)
}
return buf, nil
}
func (r *DuckDBSQLRunner) importRecord(sr io.Reader) error {
rdr, err := ipc.NewReader(sr)
if err != nil {
return fmt.Errorf("failed to create IPC reader: %w", err)
}
defer rdr.Release()
stmt, err := r.conn.NewStatement()
if err != nil {
return fmt.Errorf("failed to create new statement: %w", err)
}
if err := stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate); err != nil {
return fmt.Errorf("failed to set ingest mode: %w", err)
}
if err := stmt.SetOption(adbc.OptionKeyIngestTargetTable, "temp_table"); err != nil {
return fmt.Errorf("failed to set ingest target table: %w", err)
}
if err := stmt.BindStream(r.ctx, rdr); err != nil {
return fmt.Errorf("failed to bind stream: %w", err)
}
if _, err := stmt.ExecuteUpdate(r.ctx); err != nil {
return fmt.Errorf("failed to execute update: %w", err)
}
return stmt.Close()
}
func (r *DuckDBSQLRunner) runSQL(sql string) ([]arrow.Record, error) {
stmt, err := r.conn.NewStatement()
if err != nil {
return nil, fmt.Errorf("failed to create new statement: %w", err)
}
defer stmt.Close()
if err := stmt.SetSqlQuery(sql); err != nil {
return nil, fmt.Errorf("failed to set SQL query: %w", err)
}
out, n, err := stmt.ExecuteQuery(r.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer out.Release()
result := make([]arrow.Record, 0, n)
for out.Next() {
rec := out.Record()
rec.Retain() // .Next() will release the record, so we need to retain it
result = append(result, rec)
}
if out.Err() != nil {
return nil, out.Err()
}
return result, nil
}
func (r *DuckDBSQLRunner) RunSQLOnRecord(record arrow.Record, sql string) ([]arrow.Record, error) {
serializedRecord, err := serializeRecord(record)
if err != nil {
return nil, fmt.Errorf("failed to serialize record: %w", err)
}
if err := r.importRecord(serializedRecord); err != nil {
return nil, fmt.Errorf("failed to import record: %w", err)
}
result, err := r.runSQL(sql)
if err != nil {
return nil, fmt.Errorf("failed to run SQL: %w", err)
}
if _, err := r.runSQL("DROP TABLE temp_table"); err != nil {
return nil, fmt.Errorf("failed to drop temp table after running query: %w", err)
}
return result, nil
}
func (r *DuckDBSQLRunner) Close() {
r.conn.Close()
r.db.Close()
}
func main() {
rec := _makeSampleArrowRecord()
fmt.Println(rec)
runner, err := NewDuckDBSQLRunner(context.Background())
if err != nil {
panic(err)
}
defer runner.Close()
resultRecords, err := runner.RunSQLOnRecord(rec, "SELECT column1+1 FROM temp_table")
if err != nil {
panic(err)
}
for _, resultRecord := range resultRecords {
fmt.Println(resultRecord)
resultRecord.Release()
}
}
运行它会产生以下输出:
record:
schema:
fields: 1
- column1: type=float64
rows: 3
col[0][column1]: [1 2 3]
record:
schema:
fields: 1
- (column1 + 1): type=float64, nullable
rows: 3
col[0][(column1 + 1)]: [2 3 4]