- Installation
- Documentation
- Getting Started
- Connect
- Data Import
- Overview
- CSV Files
- JSON Files
- Multiple Files
- Parquet Files
- Partitioning
- Appender
- INSERT Statements
- Client APIs
- Overview
- C
- Overview
- Startup
- Configuration
- Query
- Data Chunks
- Vectors
- Values
- Types
- Prepared Statements
- Appender
- Table Functions
- Replacement Scans
- API Reference
- C++
- CLI
- Go
- Java
- Julia
- Node.js
- Python
- Overview
- Data Ingestion
- Conversion between DuckDB and Python
- DB API
- Relational API
- Function API
- Types API
- Expression API
- Spark API
- API Reference
- Known Python Issues
- R
- Rust
- Swift
- Wasm
- ADBC
- ODBC
- SQL
- Introduction
- Statements
- Overview
- ANALYZE
- ALTER TABLE
- ALTER VIEW
- ATTACH/DETACH
- CALL
- CHECKPOINT
- COMMENT ON
- COPY
- CREATE INDEX
- CREATE MACRO
- CREATE SCHEMA
- CREATE SECRET
- CREATE SEQUENCE
- CREATE TABLE
- CREATE VIEW
- CREATE TYPE
- DELETE
- DESCRIBE
- DROP
- EXPORT/IMPORT DATABASE
- INSERT
- PIVOT
- Profiling
- SELECT
- SET/RESET
- SUMMARIZE
- Transaction Management
- UNPIVOT
- UPDATE
- USE
- VACUUM
- Query Syntax
- SELECT
- FROM & JOIN
- WHERE
- GROUP BY
- GROUPING SETS
- HAVING
- ORDER BY
- LIMIT and OFFSET
- SAMPLE
- Unnesting
- WITH
- WINDOW
- QUALIFY
- VALUES
- FILTER
- Set Operations
- Prepared Statements
- Data Types
- Overview
- Array
- Bitstring
- Blob
- Boolean
- Date
- Enum
- Interval
- List
- Literal Types
- Map
- NULL Values
- Numeric
- Struct
- Text
- Time
- Timestamp
- Time Zones
- Union
- Typecasting
- Expressions
- Overview
- CASE Statement
- Casting
- Collations
- Comparisons
- IN Operator
- Logical Operators
- Star Expression
- Subqueries
- Functions
- Overview
- Aggregate Functions
- Array Functions
- Bitstring Functions
- Blob Functions
- Date Format Functions
- Date Functions
- Date Part Functions
- Enum Functions
- Interval Functions
- Lambda Functions
- List Functions
- Map Functions
- Nested Functions
- Numeric Functions
- Pattern Matching
- Regular Expressions
- Struct Functions
- Text Functions
- Time Functions
- Timestamp Functions
- Timestamp with Time Zone Functions
- Union Functions
- Utility Functions
- Window Functions
- Constraints
- Indexes
- Meta Queries
- DuckDB's SQL Dialect
- Samples
- Configuration
- Extensions
- Overview
- Core Extensions
- Community Extensions
- Working with Extensions
- Versioning of Extensions
- Arrow
- AutoComplete
- AWS
- Azure
- Delta
- Excel
- Full Text Search
- httpfs (HTTP and S3)
- Iceberg
- ICU
- inet
- jemalloc
- JSON
- MySQL
- PostgreSQL
- Spatial
- SQLite
- Substrait
- TPC-DS
- TPC-H
- VSS
- Guides
- Overview
- Data Viewers
- Database Integration
- File Formats
- Overview
- CSV Import
- CSV Export
- Directly Reading Files
- Excel Import
- Excel Export
- JSON Import
- JSON Export
- Parquet Import
- Parquet Export
- Querying Parquet Files
- Network & Cloud Storage
- Overview
- HTTP Parquet Import
- S3 Parquet Import
- S3 Parquet Export
- S3 Iceberg Import
- S3 Express One
- GCS Import
- Cloudflare R2 Import
- DuckDB over HTTPS/S3
- Meta Queries
- Describe Table
- EXPLAIN: Inspect Query Plans
- EXPLAIN ANALYZE: Profile Queries
- List Tables
- Summarize
- DuckDB Environment
- ODBC
- Performance
- Overview
- Import
- Schema
- Indexing
- Environment
- File Formats
- How to Tune Workloads
- My Workload Is Slow
- Benchmarks
- Python
- Installation
- Executing SQL
- Jupyter Notebooks
- SQL on Pandas
- Import from Pandas
- Export to Pandas
- Import from Numpy
- Export to Numpy
- SQL on Arrow
- Import from Arrow
- Export to Arrow
- Relational API on Pandas
- Multiple Python Threads
- Integration with Ibis
- Integration with Polars
- Using fsspec Filesystems
- SQL Editors
- SQL Features
- Snippets
- Glossary of Terms
- Browse Offline
- Operations Manual
- Overview
- Limits
- Non-Deterministic Behavior
- DuckDB's Footprint
- Securing DuckDB
- Development
- DuckDB Repositories
- Testing
- Overview
- sqllogictest Introduction
- Writing Tests
- Debugging
- Result Verification
- Persistent Testing
- Loops
- Multiple Connections
- Catch
- Profiling
- Release Calendar
- Building
- Overview
- Build Instructions
- Build Configuration
- Building Extensions
- Supported Platforms
- Troubleshooting
- Benchmark Suite
- Internals
- Sitemap
- Why DuckDB
- Media
- FAQ
- Code of Conduct
- Live Demo
This page demonstrates how to simultaneously insert into and read from a DuckDB database across multiple Python threads. This could be useful in scenarios where new data is flowing in and an analysis should be periodically re-run. Note that this is all within a single Python process (see the FAQ for details on DuckDB concurrency). Feel free to follow along in this Google Colab notebook.
Setup
First, import DuckDB and several modules from the Python standard library.
Note: if using Pandas, add import pandas
at the top of the script as well (as it must be imported prior to the multi-threading).
Then connect to a file-backed DuckDB database and create an example table to store inserted data.
This table will track the name of the thread that completed the insert and automatically insert the timestamp when that insert occurred using the DEFAULT
expression.
import duckdb
from threading import Thread, current_thread
import random
duckdb_con = duckdb.connect('my_peristent_db.duckdb')
# Use connect without parameters for an in-memory database
# duckdb_con = duckdb.connect()
duckdb_con.execute("""
CREATE OR REPLACE TABLE my_inserts (
thread_name VARCHAR,
insert_time TIMESTAMP DEFAULT current_timestamp
)
""")
Reader and Writer Functions
Next, define functions to be executed by the writer and reader threads.
Each thread must use the .cursor()
method to create a thread-local connection to the same DuckDB file based on the original connection.
This approach also works with in-memory DuckDB databases.
def write_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Insert a row with the name of the thread. insert_time is auto-generated.
thread_name = str(current_thread().name)
result = local_con.execute("""
INSERT INTO my_inserts (thread_name)
VALUES (?)
""", (thread_name,)).fetchall()
def read_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Query the current row count
thread_name = str(current_thread().name)
results = local_con.execute("""
SELECT
? AS thread_name,
count(*) AS row_counter,
current_timestamp
FROM my_inserts
""", (thread_name,)).fetchall()
print(results)
Create Threads
We define how many writers and readers to use, and define a list to track all of the threads that will be created. Then, create first writer and then reader threads. Next, shuffle them so that they will be kicked off in a random order to simulate simultaneous writers and readers. Note that the threads have not yet been executed, only defined.
write_thread_count = 50
read_thread_count = 5
threads = []
# Create multiple writer and reader threads (in the same process)
# Pass in the same connection as an argument
for i in range(write_thread_count):
threads.append(Thread(target = write_from_thread,
args = (duckdb_con,),
name = 'write_thread_' + str(i)))
for j in range(read_thread_count):
threads.append(Thread(target = read_from_thread,
args = (duckdb_con,),
name = 'read_thread_' + str(j)))
# Shuffle the threads to simulate a mix of readers and writers
random.seed(6) # Set the seed to ensure consistent results when testing
random.shuffle(threads)
Run Threads and Show Results
Now, kick off all threads to run in parallel, then wait for all of them to finish before printing out the results. Note that the timestamps of readers and writers are interspersed as expected due to the randomization.
# Kick off all threads in parallel
for thread in threads:
thread.start()
# Ensure all threads complete before printing final results
for thread in threads:
thread.join()
print(duckdb_con.execute("""
SELECT *
FROM my_inserts
ORDER BY
insert_time
""").df())