本页面演示了如何在多个Python线程中同时插入和读取DuckDB数据库。 这在有新数据流入并且需要定期重新运行分析的场景中可能很有用。 请注意,这一切都在单个Python进程内完成(有关DuckDB并发性的详细信息,请参阅FAQ)。 欢迎在此Google Colab笔记本中跟随操作。
设置
首先,导入DuckDB和Python标准库中的几个模块。
注意:如果使用Pandas,请在脚本顶部添加import pandas
(因为它必须在多线程之前导入)。
然后连接到文件支持的DuckDB数据库,并创建一个示例表来存储插入的数据。
该表将跟踪完成插入的线程名称,并使用DEFAULT
表达式自动插入插入时的时间戳。
import duckdb
from threading import Thread, current_thread
import random
duckdb_con = duckdb.connect('my_peristent_db.duckdb')
# Use connect without parameters for an in-memory database
# duckdb_con = duckdb.connect()
duckdb_con.execute("""
CREATE OR REPLACE TABLE my_inserts (
thread_name VARCHAR,
insert_time TIMESTAMP DEFAULT current_timestamp
)
""")
读取和写入函数
接下来,定义由写入线程和读取线程执行的函数。
每个线程必须使用.cursor()
方法基于原始连接创建到同一DuckDB文件的线程本地连接。
这种方法也适用于内存中的DuckDB数据库。
def write_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Insert a row with the name of the thread. insert_time is auto-generated.
thread_name = str(current_thread().name)
result = local_con.execute("""
INSERT INTO my_inserts (thread_name)
VALUES (?)
""", (thread_name,)).fetchall()
def read_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Query the current row count
thread_name = str(current_thread().name)
results = local_con.execute("""
SELECT
? AS thread_name,
count(*) AS row_counter,
current_timestamp
FROM my_inserts
""", (thread_name,)).fetchall()
print(results)
创建线程
我们定义要使用多少写者和读者,并定义一个列表来跟踪将创建的所有线程。 然后,首先创建写者线程,然后创建读者线程。 接下来,打乱它们的顺序,以便以随机顺序启动它们,以模拟同时进行的写者和读者。 请注意,线程尚未执行,仅被定义。
write_thread_count = 50
read_thread_count = 5
threads = []
# Create multiple writer and reader threads (in the same process)
# Pass in the same connection as an argument
for i in range(write_thread_count):
threads.append(Thread(target = write_from_thread,
args = (duckdb_con,),
name = 'write_thread_' + str(i)))
for j in range(read_thread_count):
threads.append(Thread(target = read_from_thread,
args = (duckdb_con,),
name = 'read_thread_' + str(j)))
# Shuffle the threads to simulate a mix of readers and writers
random.seed(6) # Set the seed to ensure consistent results when testing
random.shuffle(threads)
运行线程并显示结果
现在,启动所有线程以并行运行,然后等待所有线程完成后再打印结果。 请注意,由于随机化,读者和作者的时间戳如预期般交错。
# Kick off all threads in parallel
for thread in threads:
thread.start()
# Ensure all threads complete before printing final results
for thread in threads:
thread.join()
print(duckdb_con.execute("""
SELECT *
FROM my_inserts
ORDER BY
insert_time
""").df())