DuckDB-Wasm 有多种导入数据的方式,具体取决于数据的格式。
将数据导入DuckDB有两个步骤。
首先,使用注册函数将数据文件导入到本地文件系统中(registerEmptyFileBuffer, registerFileBuffer, registerFileHandle, registerFileText, registerFileURL)。
然后,使用插入函数(insertArrowFromIPCStream, insertArrowTable, insertCSVFromPath, insertJSONFromPath)将数据文件导入到DuckDB中,或者直接使用FROM SQL查询(使用如Parquet或Wasm-flavored httpfs等扩展)。
Insert statements 也可以用来导入数据。
数据导入
打开 & 关闭连接
// Create a new connection
const c = await db.connect();
// ... import data
// Close the connection to release memory
await c.close();
Apache Arrow
// Data can be inserted from an existing arrow.Table
// More Example https://arrow.apache.org/docs/js/
import { tableFromArrays } from 'apache-arrow';
// EOS signal according to Arrorw IPC streaming format
// See https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
const EOS = new Uint8Array([255, 255, 255, 255, 0, 0, 0, 0]);
const arrowTable = tableFromArrays({
id: [1, 2, 3],
name: ['John', 'Jane', 'Jack'],
age: [20, 21, 22],
});
await c.insertArrowTable(arrowTable, { name: 'arrow_table' });
// Write EOS
await c.insertArrowTable(EOS, { name: 'arrow_table' });
// ..., from a raw Arrow IPC stream
const streamResponse = await fetch(`someapi`);
const streamReader = streamResponse.body.getReader();
const streamInserts = [];
while (true) {
const { value, done } = await streamReader.read();
if (done) break;
streamInserts.push(c.insertArrowFromIPCStream(value, { name: 'streamed' }));
}
// Write EOS
streamInserts.push(c.insertArrowFromIPCStream(EOS, { name: 'streamed' }));
await Promise.all(streamInserts);
CSV
// ..., from CSV files
// (interchangeable: registerFile{Text,Buffer,URL,Handle})
const csvContent = '1|foo\n2|bar\n';
await db.registerFileText(`data.csv`, csvContent);
// ... with typed insert options
await c.insertCSVFromPath('data.csv', {
schema: 'main',
name: 'foo',
detect: false,
header: false,
delimiter: '|',
columns: {
col1: new arrow.Int32(),
col2: new arrow.Utf8(),
},
});
JSON
// ..., from JSON documents in row-major format
const jsonRowContent = [
{ "col1": 1, "col2": "foo" },
{ "col1": 2, "col2": "bar" },
];
await db.registerFileText(
'rows.json',
JSON.stringify(jsonRowContent),
);
await c.insertJSONFromPath('rows.json', { name: 'rows' });
// ... or column-major format
const jsonColContent = {
"col1": [1, 2],
"col2": ["foo", "bar"]
};
await db.registerFileText(
'columns.json',
JSON.stringify(jsonColContent),
);
await c.insertJSONFromPath('columns.json', { name: 'columns' });
// From API
const streamResponse = await fetch(`someapi/content.json`);
await db.registerFileBuffer('file.json', new Uint8Array(await streamResponse.arrayBuffer()))
await c.insertJSONFromPath('file.json', { name: 'JSONContent' });
Parquet
// from Parquet files
// ...Local
const pickedFile: File = letUserPickFile();
await db.registerFileHandle('local.parquet', pickedFile, DuckDBDataProtocol.BROWSER_FILEREADER, true);
// ...Remote
await db.registerFileURL('remote.parquet', 'https://origin/remote.parquet', DuckDBDataProtocol.HTTP, false);
// ... Using Fetch
const res = await fetch('https://origin/remote.parquet');
await db.registerFileBuffer('buffer.parquet', new Uint8Array(await res.arrayBuffer()));
// ..., by specifying URLs in the SQL text
await c.query(`
CREATE TABLE direct AS
SELECT * FROM 'https://origin/remote.parquet'
`);
// ..., or by executing raw insert statements
await c.query(`
INSERT INTO existing_table
VALUES (1, 'foo'), (2, 'bar')`);
httpfs (Wasm风味)
// ..., by specifying URLs in the SQL text
await c.query(`
CREATE TABLE direct AS
SELECT * FROM 'https://origin/remote.parquet'
`);
提示 如果您在尝试从S3查询文件时遇到网络错误(
Failed to execute 'send' on 'XMLHttpRequest'
),请配置S3权限的CORS头。例如:
[
{
"AllowedHeaders": [
"*"
],
"AllowedMethods": [
"GET",
"HEAD"
],
"AllowedOrigins": [
"*"
],
"ExposeHeaders": [],
"MaxAgeSeconds": 3000
}
]
插入语句
// ..., or by executing raw insert statements
await c.query(`
INSERT INTO existing_table
VALUES (1, 'foo'), (2, 'bar')`);