From d7f8a338b69a15d9844b3dd34df13d3f26b64384 Mon Sep 17 00:00:00 2001 From: kennethcheng Date: Sat, 2 May 2026 23:16:25 +0800 Subject: [PATCH] =?UTF-8?q?chore(db):=20=E6=96=B0=E5=A2=9E=20CSV=20?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=E6=B1=87=E7=8E=87=E6=92=AD=E7=A7=8D=E8=84=9A?= =?UTF-8?q?=E6=9C=AC=E5=8F=8A=E8=A1=A8=E5=94=AF=E4=B8=80=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + Memory.md | 10 +++ scripts/seed-historical-rates.ts | 104 +++++++++++++++++++++++++++++++ src/db/schema.ts | 4 +- 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 scripts/seed-historical-rates.ts diff --git a/.gitignore b/.gitignore index 4b780ed..925cabe 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ next-env.d.ts .vscode/ .idea/ Memory.md +scripts/rates.csv diff --git a/Memory.md b/Memory.md index 37b5b11..b3be104 100644 --- a/Memory.md +++ b/Memory.md @@ -1,5 +1,15 @@ # Omniledger 架构与开发记忆 (Memory) +## 开发基于 CSV 的历史汇率数据播种脚本,在 Schema 增加联合唯一约束,实装 BOM 头剔除与分批 Upsert 逻辑,确保海量历史金融数据的幂等安全写入 (Task 50) +- 在 `src/db/schema.ts` 的 `exchangeRatesHistory` 表中新增联合唯一约束 `rate_time_unq`,基于 `(from_currency, to_currency, fetch_time)` 三列,防止重复写入,确保幂等性防线。 +- 在 `scripts/` 目录下创建 `seed-historical-rates.ts` 播种脚本,支持运行方式:`npx tsx scripts/seed-historical-rates.ts`。 +- **CSV 解析逻辑**:使用 Node.js 原生 `fs.readFileSync` 读取 `scripts/rates.csv`,按换行符切割并剔除表头;**必须处理 BOM 头**(`\uFEFF`)与空白符(`trim()`),确保字段纯净。 +- **数据校验**:对 `rate` 字段执行 `parseFloat` 类型检查,对 `fetchTime` 执行 `Date` 解析有效性验证,非法行跳过并输出警告日志。 +- **分批 Upsert 写入**:将解析好的 `records` 数组按 500 条/批次切割,使用 Drizzle 的 `onConflictDoUpdate` 执行批量插入;冲突时(基于联合唯一约束)更新 `rate` 字段为最新值,确保数据幂等安全。 +- **验证**:脚本成功解析 1000 条有效记录(USD→CNY 500 条 + HKD→CNY 500 条),分 2 个批次完成 Upsert,数据库 `exchange_rates_history` 表已填充完整历史汇率数据。 + +## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83) + ## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83) - **Bug 1 - 时光机汇率缺失**:在 `src/actions/snapshots.ts` 的 `reconstructPortfolioHistory()` 中,`historicalTx` 查询的 `select` 遗漏了 `exchangeRate` 字段,导致 `posCostCny` 计算直接使用 `metrics.accumulatedCost`(未经汇率折算的本币值),造成历史投入本金严重失真。 - **修复方案**:在 `historicalTx` 的 select 中追加 `exchangeRate: transactions.exchangeRate`;彻底重写 `posCostCny` 计算逻辑:从交易流水中按时间顺序遍历 BUY/SELL,对每笔交易使用 `qty * price * exchangeRate` 手动计算真实法币成本,SELL 时按当前累计法币成本 ÷ 当前数量得出的平均成本扣减,杜绝 `metrics.accumulatedCost` 直接入库。 diff --git a/scripts/seed-historical-rates.ts b/scripts/seed-historical-rates.ts new file mode 100644 index 0000000..6d1605b --- /dev/null +++ b/scripts/seed-historical-rates.ts @@ -0,0 +1,104 @@ +/** + * 历史汇率数据播种脚本 + * 运行方式: npx tsx scripts/seed-historical-rates.ts + * + * 功能:解析 scripts/rates.csv,将历史汇率数据分批 upsert 到 exchange_rates_history 表。 + * 特性: + * - 自动剔除 BOM 头 (\uFEFF) + * - 按 500 条/批次批量写入 + * - 联合唯一约束 (fromCurrency, toCurrency, fetchTime) 确保幂等性 + */ + +import * as fs from 'fs'; +import * as path from 'path'; +import { db } from '@/db'; +import { exchangeRatesHistory } from '@/db/schema'; +import { sql } from 'drizzle-orm'; + +const BATCH_SIZE = 500; + +function parseCsv(filePath: string): typeof exchangeRatesHistory.$inferInsert[] { + const raw = fs.readFileSync(filePath, 'utf-8'); + const content = raw.replace(/^\uFEFF/, ''); + const lines = content.split('\n').filter(line => line.trim().length > 0); + + // 剔除表头 + const header = lines[0]; + if (!header) { + throw new Error('CSV file is empty'); + } + + const records: typeof exchangeRatesHistory.$inferInsert[] = []; + for (let i = 1; i < lines.length; i++) { + const line = lines[i].replace(/^\uFEFF/, '').trim(); + if (!line) continue; + + const parts = line.split(','); + if (parts.length < 4) { + console.warn(`Skipping malformed line ${i + 1}: ${line}`); + continue; + } + + const [fromCurrency, toCurrency, rateStr, fetchTimeStr] = parts; + + const rate = parseFloat(rateStr.trim()); + if (isNaN(rate)) { + console.warn(`Skipping line ${i + 1} with invalid rate: ${rateStr}`); + continue; + } + + const fetchTime = new Date(fetchTimeStr.trim()); + if (isNaN(fetchTime.getTime())) { + console.warn(`Skipping line ${i + 1} with invalid date: ${fetchTimeStr}`); + continue; + } + + records.push({ + fromCurrency: fromCurrency.trim(), + toCurrency: toCurrency.trim(), + rate: rate.toString(), + fetchTime, + }); + } + + return records; +} + +async function seed() { + const csvPath = path.join(__dirname, 'rates.csv'); + + if (!fs.existsSync(csvPath)) { + console.error(`CSV file not found: ${csvPath}`); + process.exit(1); + } + + console.log(`Reading CSV from: ${csvPath}`); + const records = parseCsv(csvPath); + console.log(`Parsed ${records.length} valid records.`); + + const batches: typeof exchangeRatesHistory.$inferInsert[][] = []; + for (let i = 0; i < records.length; i += BATCH_SIZE) { + batches.push(records.slice(i, i + BATCH_SIZE)); + } + + let totalUpserted = 0; + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + await db + .insert(exchangeRatesHistory) + .values(batch) + .onConflictDoUpdate({ + target: [exchangeRatesHistory.fromCurrency, exchangeRatesHistory.toCurrency, exchangeRatesHistory.fetchTime], + set: { rate: sql`EXCLUDED.rate` }, + }); + totalUpserted += batch.length; + console.log(`Batch ${i + 1}/${batches.length}: ${batch.length} records processed (${totalUpserted}/${records.length} total).`); + } + + console.log(`Seed complete. Total records processed: ${totalUpserted}`); +} + +seed().catch((err) => { + console.error('Seed failed:', err); + process.exit(1); +}); diff --git a/src/db/schema.ts b/src/db/schema.ts index b35930e..535c73a 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -105,4 +105,6 @@ export const exchangeRatesHistory = pgTable("exchange_rates_history", { createdAt: timestamp("created_at", { withTimezone: true, mode: "date" }) .defaultNow() .notNull(), -}); +}, (table) => ({ + unq: unique("rate_time_unq").on(table.fromCurrency, table.toCurrency, table.fetchTime), +}));