DuckDB Julia 包为 DuckDB 提供了一个高性能的前端。与 SQLite 类似,DuckDB 在 Julia 客户端内进程中运行,并提供了一个 DBInterface 前端。
该包还支持多线程执行。它使用Julia线程/任务来实现这一目的。如果您希望并行运行查询,您必须启动支持多线程的Julia(例如,通过设置JULIA_NUM_THREADS
环境变量)。
Installation
按如下方式安装DuckDB:
using Pkg
Pkg.add("DuckDB")
或者,使用 ]
键进入包管理器,并发出以下命令:
pkg> add DuckDB
基础
using DuckDB
# create a new in-memory database
con = DBInterface.connect(DuckDB.DB, ":memory:")
# create a table
DBInterface.execute(con, "CREATE TABLE integers (i INTEGER)")
# insert data by executing a prepared statement
stmt = DBInterface.prepare(con, "INSERT INTO integers VALUES(?)")
DBInterface.execute(stmt, [42])
# query the database
results = DBInterface.execute(con, "SELECT 42 a")
print(results)
一些SQL语句,例如PIVOT和IMPORT DATABASE,会作为多个预准备语句执行,在使用DuckDB.execute()
时会出错。相反,它们可以使用DuckDB.query()
而不是DuckDB.execute()
来运行,并且总是会返回一个具体化的结果。
扫描数据框
DuckDB Julia 包还提供了对查询 Julia DataFrames 的支持。请注意,DataFrames 是直接由 DuckDB 读取的——它们不会被插入或复制到数据库本身。
如果您希望将数据从DataFrame加载到DuckDB表中,您可以运行CREATE TABLE ... AS
或INSERT INTO
查询。
using DuckDB
using DataFrames
# create a new in-memory dabase
con = DBInterface.connect(DuckDB.DB)
# create a DataFrame
df = DataFrame(a = [1, 2, 3], b = [42, 84, 42])
# register it as a view in the database
DuckDB.register_data_frame(con, df, "my_df")
# run a SQL query over the DataFrame
results = DBInterface.execute(con, "SELECT * FROM my_df")
print(results)
Appender API
DuckDB Julia 包还支持 Appender API,它比使用预处理语句或单独的 INSERT INTO
语句要快得多。追加是以行格式进行的。对于每一列,应该调用 append()
,然后通过调用 flush()
来完成该行。在所有行都追加完毕后,应该使用 close()
来最终化 Appender 并清理生成的内存。
using DuckDB, DataFrames, Dates
db = DuckDB.DB()
# create a table
DBInterface.execute(db,
"CREATE OR REPLACE TABLE data(id INTEGER PRIMARY KEY, value FLOAT, timestamp TIMESTAMP, date DATE)")
# create data to insert
len = 100
df = DataFrames.DataFrame(
id = collect(1:len),
value = rand(len),
timestamp = Dates.now() + Dates.Second.(1:len),
date = Dates.today() + Dates.Day.(1:len)
)
# append data by row
appender = DuckDB.Appender(db, "data")
for i in eachrow(df)
for j in i
DuckDB.append(appender, j)
end
DuckDB.end_row(appender)
end
# close the appender after all rows
DuckDB.close(appender)
Concurrency
在Julia进程中,只要每个任务保持自己的数据库连接,任务就能够并发地读取和写入数据库。在下面的示例中,生成一个任务来定期读取数据库,并生成许多任务来使用INSERT
语句以及Appender API写入数据库。
using Dates, DataFrames, DuckDB
db = DuckDB.DB()
DBInterface.connect(db)
DBInterface.execute(db, "CREATE OR REPLACE TABLE data (date TIMESTAMP, id INTEGER)")
function run_reader(db)
# create a DuckDB connection specifically for this task
conn = DBInterface.connect(db)
while true
println(DBInterface.execute(conn,
"SELECT id, count(date) AS count, max(date) AS max_date
FROM data GROUP BY id ORDER BY id") |> DataFrames.DataFrame)
Threads.sleep(1)
end
DBInterface.close(conn)
end
# spawn one reader task
Threads.@spawn run_reader(db)
function run_inserter(db, id)
# create a DuckDB connection specifically for this task
conn = DBInterface.connect(db)
for i in 1:1000
Threads.sleep(0.01)
DuckDB.execute(conn, "INSERT INTO data VALUES (current_timestamp, ?)"; id);
end
DBInterface.close(conn)
end
# spawn many insert tasks
for i in 1:100
Threads.@spawn run_inserter(db, 1)
end
function run_appender(db, id)
# create a DuckDB connection specifically for this task
appender = DuckDB.Appender(db, "data")
for i in 1:1000
Threads.sleep(0.01)
row = (Dates.now(Dates.UTC), id)
for j in row
DuckDB.append(appender, j);
end
DuckDB.end_row(appender);
end
DuckDB.close(appender);
end
# spawn many appender tasks
for i in 1:100
Threads.@spawn run_appender(db, 2)
end
Original Julia Connector
感谢kimmolinna提供的原始DuckDB Julia连接器。