chore(db): 新增 CSV 历史汇率播种脚本及表唯一约束
This commit is contained in:
parent
189266c5e3
commit
d7f8a338b6
1
.gitignore
vendored
1
.gitignore
vendored
@ -43,3 +43,4 @@ next-env.d.ts
|
|||||||
.vscode/
|
.vscode/
|
||||||
.idea/
|
.idea/
|
||||||
Memory.md
|
Memory.md
|
||||||
|
scripts/rates.csv
|
||||||
|
|||||||
10
Memory.md
10
Memory.md
@ -1,5 +1,15 @@
|
|||||||
# Omniledger 架构与开发记忆 (Memory)
|
# Omniledger 架构与开发记忆 (Memory)
|
||||||
|
|
||||||
|
## 开发基于 CSV 的历史汇率数据播种脚本,在 Schema 增加联合唯一约束,实装 BOM 头剔除与分批 Upsert 逻辑,确保海量历史金融数据的幂等安全写入 (Task 50)
|
||||||
|
- 在 `src/db/schema.ts` 的 `exchangeRatesHistory` 表中新增联合唯一约束 `rate_time_unq`,基于 `(from_currency, to_currency, fetch_time)` 三列,防止重复写入,确保幂等性防线。
|
||||||
|
- 在 `scripts/` 目录下创建 `seed-historical-rates.ts` 播种脚本,支持运行方式:`npx tsx scripts/seed-historical-rates.ts`。
|
||||||
|
- **CSV 解析逻辑**:使用 Node.js 原生 `fs.readFileSync` 读取 `scripts/rates.csv`,按换行符切割并剔除表头;**必须处理 BOM 头**(`\uFEFF`)与空白符(`trim()`),确保字段纯净。
|
||||||
|
- **数据校验**:对 `rate` 字段执行 `parseFloat` 类型检查,对 `fetchTime` 执行 `Date` 解析有效性验证,非法行跳过并输出警告日志。
|
||||||
|
- **分批 Upsert 写入**:将解析好的 `records` 数组按 500 条/批次切割,使用 Drizzle 的 `onConflictDoUpdate` 执行批量插入;冲突时(基于联合唯一约束)更新 `rate` 字段为最新值,确保数据幂等安全。
|
||||||
|
- **验证**:脚本成功解析 1000 条有效记录(USD→CNY 500 条 + HKD→CNY 500 条),分 2 个批次完成 Upsert,数据库 `exchange_rates_history` 表已填充完整历史汇率数据。
|
||||||
|
|
||||||
|
## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83)
|
||||||
|
|
||||||
## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83)
|
## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83)
|
||||||
- **Bug 1 - 时光机汇率缺失**:在 `src/actions/snapshots.ts` 的 `reconstructPortfolioHistory()` 中,`historicalTx` 查询的 `select` 遗漏了 `exchangeRate` 字段,导致 `posCostCny` 计算直接使用 `metrics.accumulatedCost`(未经汇率折算的本币值),造成历史投入本金严重失真。
|
- **Bug 1 - 时光机汇率缺失**:在 `src/actions/snapshots.ts` 的 `reconstructPortfolioHistory()` 中,`historicalTx` 查询的 `select` 遗漏了 `exchangeRate` 字段,导致 `posCostCny` 计算直接使用 `metrics.accumulatedCost`(未经汇率折算的本币值),造成历史投入本金严重失真。
|
||||||
- **修复方案**:在 `historicalTx` 的 select 中追加 `exchangeRate: transactions.exchangeRate`;彻底重写 `posCostCny` 计算逻辑:从交易流水中按时间顺序遍历 BUY/SELL,对每笔交易使用 `qty * price * exchangeRate` 手动计算真实法币成本,SELL 时按当前累计法币成本 ÷ 当前数量得出的平均成本扣减,杜绝 `metrics.accumulatedCost` 直接入库。
|
- **修复方案**:在 `historicalTx` 的 select 中追加 `exchangeRate: transactions.exchangeRate`;彻底重写 `posCostCny` 计算逻辑:从交易流水中按时间顺序遍历 BUY/SELL,对每笔交易使用 `qty * price * exchangeRate` 手动计算真实法币成本,SELL 时按当前累计法币成本 ÷ 当前数量得出的平均成本扣减,杜绝 `metrics.accumulatedCost` 直接入库。
|
||||||
|
|||||||
104
scripts/seed-historical-rates.ts
Normal file
104
scripts/seed-historical-rates.ts
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
/**
|
||||||
|
* 历史汇率数据播种脚本
|
||||||
|
* 运行方式: npx tsx scripts/seed-historical-rates.ts
|
||||||
|
*
|
||||||
|
* 功能:解析 scripts/rates.csv,将历史汇率数据分批 upsert 到 exchange_rates_history 表。
|
||||||
|
* 特性:
|
||||||
|
* - 自动剔除 BOM 头 (\uFEFF)
|
||||||
|
* - 按 500 条/批次批量写入
|
||||||
|
* - 联合唯一约束 (fromCurrency, toCurrency, fetchTime) 确保幂等性
|
||||||
|
*/
|
||||||
|
|
||||||
|
import * as fs from 'fs';
|
||||||
|
import * as path from 'path';
|
||||||
|
import { db } from '@/db';
|
||||||
|
import { exchangeRatesHistory } from '@/db/schema';
|
||||||
|
import { sql } from 'drizzle-orm';
|
||||||
|
|
||||||
|
const BATCH_SIZE = 500;
|
||||||
|
|
||||||
|
function parseCsv(filePath: string): typeof exchangeRatesHistory.$inferInsert[] {
|
||||||
|
const raw = fs.readFileSync(filePath, 'utf-8');
|
||||||
|
const content = raw.replace(/^\uFEFF/, '');
|
||||||
|
const lines = content.split('\n').filter(line => line.trim().length > 0);
|
||||||
|
|
||||||
|
// 剔除表头
|
||||||
|
const header = lines[0];
|
||||||
|
if (!header) {
|
||||||
|
throw new Error('CSV file is empty');
|
||||||
|
}
|
||||||
|
|
||||||
|
const records: typeof exchangeRatesHistory.$inferInsert[] = [];
|
||||||
|
for (let i = 1; i < lines.length; i++) {
|
||||||
|
const line = lines[i].replace(/^\uFEFF/, '').trim();
|
||||||
|
if (!line) continue;
|
||||||
|
|
||||||
|
const parts = line.split(',');
|
||||||
|
if (parts.length < 4) {
|
||||||
|
console.warn(`Skipping malformed line ${i + 1}: ${line}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [fromCurrency, toCurrency, rateStr, fetchTimeStr] = parts;
|
||||||
|
|
||||||
|
const rate = parseFloat(rateStr.trim());
|
||||||
|
if (isNaN(rate)) {
|
||||||
|
console.warn(`Skipping line ${i + 1} with invalid rate: ${rateStr}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const fetchTime = new Date(fetchTimeStr.trim());
|
||||||
|
if (isNaN(fetchTime.getTime())) {
|
||||||
|
console.warn(`Skipping line ${i + 1} with invalid date: ${fetchTimeStr}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
records.push({
|
||||||
|
fromCurrency: fromCurrency.trim(),
|
||||||
|
toCurrency: toCurrency.trim(),
|
||||||
|
rate: rate.toString(),
|
||||||
|
fetchTime,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function seed() {
|
||||||
|
const csvPath = path.join(__dirname, 'rates.csv');
|
||||||
|
|
||||||
|
if (!fs.existsSync(csvPath)) {
|
||||||
|
console.error(`CSV file not found: ${csvPath}`);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Reading CSV from: ${csvPath}`);
|
||||||
|
const records = parseCsv(csvPath);
|
||||||
|
console.log(`Parsed ${records.length} valid records.`);
|
||||||
|
|
||||||
|
const batches: typeof exchangeRatesHistory.$inferInsert[][] = [];
|
||||||
|
for (let i = 0; i < records.length; i += BATCH_SIZE) {
|
||||||
|
batches.push(records.slice(i, i + BATCH_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
let totalUpserted = 0;
|
||||||
|
for (let i = 0; i < batches.length; i++) {
|
||||||
|
const batch = batches[i];
|
||||||
|
await db
|
||||||
|
.insert(exchangeRatesHistory)
|
||||||
|
.values(batch)
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: [exchangeRatesHistory.fromCurrency, exchangeRatesHistory.toCurrency, exchangeRatesHistory.fetchTime],
|
||||||
|
set: { rate: sql`EXCLUDED.rate` },
|
||||||
|
});
|
||||||
|
totalUpserted += batch.length;
|
||||||
|
console.log(`Batch ${i + 1}/${batches.length}: ${batch.length} records processed (${totalUpserted}/${records.length} total).`);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Seed complete. Total records processed: ${totalUpserted}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
seed().catch((err) => {
|
||||||
|
console.error('Seed failed:', err);
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
@ -105,4 +105,6 @@ export const exchangeRatesHistory = pgTable("exchange_rates_history", {
|
|||||||
createdAt: timestamp("created_at", { withTimezone: true, mode: "date" })
|
createdAt: timestamp("created_at", { withTimezone: true, mode: "date" })
|
||||||
.defaultNow()
|
.defaultNow()
|
||||||
.notNull(),
|
.notNull(),
|
||||||
});
|
}, (table) => ({
|
||||||
|
unq: unique("rate_time_unq").on(table.fromCurrency, table.toCurrency, table.fetchTime),
|
||||||
|
}));
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user