Compare commits
3 Commits
89b40a72bb
...
ef412b366a
| Author | SHA1 | Date | |
|---|---|---|---|
| ef412b366a | |||
| d7f8a338b6 | |||
| 189266c5e3 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -43,3 +43,4 @@ next-env.d.ts
|
||||
.vscode/
|
||||
.idea/
|
||||
Memory.md
|
||||
scripts/rates.csv
|
||||
|
||||
27
Memory.md
27
Memory.md
@ -1,5 +1,22 @@
|
||||
# Omniledger 架构与开发记忆 (Memory)
|
||||
|
||||
## 开发基于 CSV 的历史汇率数据播种脚本,在 Schema 增加联合唯一约束,实装 BOM 头剔除与分批 Upsert 逻辑,确保海量历史金融数据的幂等安全写入 (Task 50)
|
||||
- 在 `src/db/schema.ts` 的 `exchangeRatesHistory` 表中新增联合唯一约束 `rate_time_unq`,基于 `(from_currency, to_currency, fetch_time)` 三列,防止重复写入,确保幂等性防线。
|
||||
- 在 `scripts/` 目录下创建 `seed-historical-rates.ts` 播种脚本,支持运行方式:`npx tsx scripts/seed-historical-rates.ts`。
|
||||
- **CSV 解析逻辑**:使用 Node.js 原生 `fs.readFileSync` 读取 `scripts/rates.csv`,按换行符切割并剔除表头;**必须处理 BOM 头**(`\uFEFF`)与空白符(`trim()`),确保字段纯净。
|
||||
- **数据校验**:对 `rate` 字段执行 `parseFloat` 类型检查,对 `fetchTime` 执行 `Date` 解析有效性验证,非法行跳过并输出警告日志。
|
||||
- **分批 Upsert 写入**:将解析好的 `records` 数组按 500 条/批次切割,使用 Drizzle 的 `onConflictDoUpdate` 执行批量插入;冲突时(基于联合唯一约束)更新 `rate` 字段为最新值,确保数据幂等安全。
|
||||
- **验证**:脚本成功解析 1000 条有效记录(USD→CNY 500 条 + HKD→CNY 500 条),分 2 个批次完成 Upsert,数据库 `exchange_rates_history` 表已填充完整历史汇率数据。
|
||||
|
||||
## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83)
|
||||
|
||||
## 大修快照生成引擎 (snapshots.ts),修复时光机重建历史时未乘汇率导致本币入库的致命 Bug,并消灭日常快照中的反推本金逻辑 (Task 83)
|
||||
- **Bug 1 - 时光机汇率缺失**:在 `src/actions/snapshots.ts` 的 `reconstructPortfolioHistory()` 中,`historicalTx` 查询的 `select` 遗漏了 `exchangeRate` 字段,导致 `posCostCny` 计算直接使用 `metrics.accumulatedCost`(未经汇率折算的本币值),造成历史投入本金严重失真。
|
||||
- **修复方案**:在 `historicalTx` 的 select 中追加 `exchangeRate: transactions.exchangeRate`;彻底重写 `posCostCny` 计算逻辑:从交易流水中按时间顺序遍历 BUY/SELL,对每笔交易使用 `qty * price * exchangeRate` 手动计算真实法币成本,SELL 时按当前累计法币成本 ÷ 当前数量得出的平均成本扣减,杜绝 `metrics.accumulatedCost` 直接入库。
|
||||
- **Bug 2 - 日常快照反推本金**:`recordDailySnapshot()` 使用 `mv.minus(ap)`(市值 - 累计盈亏)反推本金,违反"绝对禁止反推本金"的架构红线,且会被旧 PnL 数据污染。
|
||||
- **修复方案**:将 `totalCostCny` 计算改为直接累加底层 `totalCostCny` 字段:`positions.reduce((sum, pos) => sum.plus(new Big(pos.totalCostCny || '0')), new Big(0))`,确保本金数据原汁原味。
|
||||
- **执行与验收**:成功执行 `scripts/reconstruct.ts` 全量重建 1248 天历史快照;数据库 `portfolio_snapshots` 表已覆写完毕,2022/12/12 节点投入本金精确显示为 `5094.59`。
|
||||
|
||||
## 通过引入 force-dynamic 和 revalidatePath 彻底剥离 Next.js 默认缓存机制,确保走势图等核心财务 UI 与底层数据库的 0 延迟一致性 (Task 78)
|
||||
- 在 `app/layout.tsx`(根布局)和 `app/dashboard/layout.tsx`(Dashboard 布局)顶部强制声明 `export const dynamic = 'force-dynamic'` 与 `export const revalidate = 0`,确保整棵 Server Component 树绝不缓存财务大盘数据。
|
||||
- 在 `app/api/admin/rebuild-snapshots/route.ts` 中引入 `revalidatePath('/dashboard', 'page')` 与 `revalidatePath('/', 'layout')`,在历史快照全量重建并批量 INSERT 入库完成后、返回 Response 之前执行缓存清盘钩子,使 Dashboard 页面下次访问时强制读取最新数据库快照。
|
||||
@ -417,4 +434,12 @@
|
||||
- **数据映射层蛇形/驼峰双重兼容**:在 `chartData` 的 `map` 函数中,采用 `parseFloat(s.totalCostCny) || parseFloat(s.total_cost_cny || 0)` 的强制 fallback 逻辑,确保无论后端返回哪种命名风格都能正确解析。
|
||||
- **Tooltip 防御性解构**:`CustomTooltip` 中的值读取改为 `Number(dataNode.totalValueCny || dataNode._raw?.totalValueCny || 0) || 0`,通过 `_raw` 快照兜底读取,确保 Tooltip 永远能拿到本金和现值的真实数据。
|
||||
- **Snapshot 接口扩展**:新增 `total_value_cny?: string` 和 `total_cost_cny?: string` 可选字段,`ChartDatum` 接口新增 `_raw: Snapshot` 字段用于 Tooltip 层 fallback。
|
||||
- **验收标准**:控制台 `【CHART DATA DEBUG】` 打印出带真实本金(如 5094)的字段;Tooltip 中投入本金显示真实法币数字,彻底消除 704 旧账残影。
|
||||
- **验收标准**:控制台 `【CHART DATA DEBUG】` 打印出带真实本金(如 5094)的字段;Tooltip 中投入本金显示真实法币数字,彻底消除 704 旧账残影。
|
||||
|
||||
## 基于现有生产级 API 鉴权,补充开发了 scripts/trigger-rebuild.ts 本地触发脚本,实现了安全、隔离的本地时光机重置工作流 (Task 84)
|
||||
- 在项目根目录创建 `scripts/trigger-rebuild.ts` 独立触发脚本,作为 `app/api/admin/rebuild-snapshots/route.ts` 的本地运维入口。
|
||||
- 脚本通过 `dotenv` 强制加载 `.env.local` 和 `.env` 环境变量文件,优先读取 `REBUILD_SECRET`,降级读取 `CRON_SECRET`,确保鉴权密钥的安全获取。
|
||||
- **核心逻辑**:脚本向 `http://localhost:8080/api/admin/rebuild-snapshots` 发送 POST 请求,携带 `Authorization: Bearer <secret>` 请求头,复用生产级 Bearer Token 强校验机制,未配置密钥时提前退出。
|
||||
- **架构红线**:`app/api/admin/rebuild-snapshots/route.ts` 中的生产级 POST + Bearer Token 强校验代码未被修改,保持原有的安全隔离设计。
|
||||
- **运行方式**:`npx tsx scripts/trigger-rebuild.ts`(需确保 `npm run dev` 在另一个终端运行且端口为 8080)。
|
||||
- **设计收益**:本地开发者无需记忆 curl 命令或手动构造请求头,通过脚本即可安全触发历史快照重建,降低了运维门槛并保持了与生产鉴权机制的一致性。
|
||||
104
scripts/seed-historical-rates.ts
Normal file
104
scripts/seed-historical-rates.ts
Normal file
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* 历史汇率数据播种脚本
|
||||
* 运行方式: npx tsx scripts/seed-historical-rates.ts
|
||||
*
|
||||
* 功能:解析 scripts/rates.csv,将历史汇率数据分批 upsert 到 exchange_rates_history 表。
|
||||
* 特性:
|
||||
* - 自动剔除 BOM 头 (\uFEFF)
|
||||
* - 按 500 条/批次批量写入
|
||||
* - 联合唯一约束 (fromCurrency, toCurrency, fetchTime) 确保幂等性
|
||||
*/
|
||||
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import { db } from '@/db';
|
||||
import { exchangeRatesHistory } from '@/db/schema';
|
||||
import { sql } from 'drizzle-orm';
|
||||
|
||||
const BATCH_SIZE = 500;
|
||||
|
||||
function parseCsv(filePath: string): typeof exchangeRatesHistory.$inferInsert[] {
|
||||
const raw = fs.readFileSync(filePath, 'utf-8');
|
||||
const content = raw.replace(/^\uFEFF/, '');
|
||||
const lines = content.split('\n').filter(line => line.trim().length > 0);
|
||||
|
||||
// 剔除表头
|
||||
const header = lines[0];
|
||||
if (!header) {
|
||||
throw new Error('CSV file is empty');
|
||||
}
|
||||
|
||||
const records: typeof exchangeRatesHistory.$inferInsert[] = [];
|
||||
for (let i = 1; i < lines.length; i++) {
|
||||
const line = lines[i].replace(/^\uFEFF/, '').trim();
|
||||
if (!line) continue;
|
||||
|
||||
const parts = line.split(',');
|
||||
if (parts.length < 4) {
|
||||
console.warn(`Skipping malformed line ${i + 1}: ${line}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const [fromCurrency, toCurrency, rateStr, fetchTimeStr] = parts;
|
||||
|
||||
const rate = parseFloat(rateStr.trim());
|
||||
if (isNaN(rate)) {
|
||||
console.warn(`Skipping line ${i + 1} with invalid rate: ${rateStr}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const fetchTime = new Date(fetchTimeStr.trim());
|
||||
if (isNaN(fetchTime.getTime())) {
|
||||
console.warn(`Skipping line ${i + 1} with invalid date: ${fetchTimeStr}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
records.push({
|
||||
fromCurrency: fromCurrency.trim(),
|
||||
toCurrency: toCurrency.trim(),
|
||||
rate: rate.toString(),
|
||||
fetchTime,
|
||||
});
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
async function seed() {
|
||||
const csvPath = path.join(__dirname, 'rates.csv');
|
||||
|
||||
if (!fs.existsSync(csvPath)) {
|
||||
console.error(`CSV file not found: ${csvPath}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log(`Reading CSV from: ${csvPath}`);
|
||||
const records = parseCsv(csvPath);
|
||||
console.log(`Parsed ${records.length} valid records.`);
|
||||
|
||||
const batches: typeof exchangeRatesHistory.$inferInsert[][] = [];
|
||||
for (let i = 0; i < records.length; i += BATCH_SIZE) {
|
||||
batches.push(records.slice(i, i + BATCH_SIZE));
|
||||
}
|
||||
|
||||
let totalUpserted = 0;
|
||||
for (let i = 0; i < batches.length; i++) {
|
||||
const batch = batches[i];
|
||||
await db
|
||||
.insert(exchangeRatesHistory)
|
||||
.values(batch)
|
||||
.onConflictDoUpdate({
|
||||
target: [exchangeRatesHistory.fromCurrency, exchangeRatesHistory.toCurrency, exchangeRatesHistory.fetchTime],
|
||||
set: { rate: sql`EXCLUDED.rate` },
|
||||
});
|
||||
totalUpserted += batch.length;
|
||||
console.log(`Batch ${i + 1}/${batches.length}: ${batch.length} records processed (${totalUpserted}/${records.length} total).`);
|
||||
}
|
||||
|
||||
console.log(`Seed complete. Total records processed: ${totalUpserted}`);
|
||||
}
|
||||
|
||||
seed().catch((err) => {
|
||||
console.error('Seed failed:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
38
scripts/trigger-rebuild.ts
Normal file
38
scripts/trigger-rebuild.ts
Normal file
@ -0,0 +1,38 @@
|
||||
import { config } from 'dotenv';
|
||||
// 强制加载所有可能的本地环境变量文件
|
||||
config({ path: ['.env.local', '.env'] });
|
||||
|
||||
const triggerRebuild = async () => {
|
||||
// 优先读取 REBUILD_SECRET,降级读取 CRON_SECRET
|
||||
const secret = process.env.REBUILD_SECRET || process.env.CRON_SECRET;
|
||||
|
||||
if (!secret) {
|
||||
console.error('❌ 致命错误: 未在 .env.local 或 .env 找到 REBUILD_SECRET 或 CRON_SECRET');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log('🚀 正在携带合法 Token 请求时光机重置接口...');
|
||||
|
||||
try {
|
||||
// 默认请求本地 8080 端口,确保 Next.js 服务正在运行
|
||||
const response = await fetch('http://localhost:8080/api/admin/rebuild-snapshots', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Authorization': `Bearer ${secret}`,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
});
|
||||
|
||||
const data = await response.json();
|
||||
|
||||
if (response.ok) {
|
||||
console.log('✅ 历史快照重建成功!', JSON.stringify(data, null, 2));
|
||||
} else {
|
||||
console.error('❌ 重建失败,服务器返回:', data);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ 请求发送异常 (请确认 npm run dev 正在运行且端口为 8080):', error);
|
||||
}
|
||||
};
|
||||
|
||||
triggerRebuild();
|
||||
@ -32,13 +32,8 @@ export async function recordDailySnapshot() {
|
||||
new Big(0)
|
||||
).toString();
|
||||
|
||||
// 推导真实投入本金 CNY = 市值 - 累计盈亏
|
||||
const totalCostCny = positions.reduce(
|
||||
(sum, pos) => {
|
||||
const mv = new Big(pos.marketValueCny || '0');
|
||||
const ap = new Big(pos.accumulatedPnlCny || '0');
|
||||
return sum.plus(mv.minus(ap));
|
||||
},
|
||||
(sum, pos) => sum.plus(new Big(pos.totalCostCny || '0')),
|
||||
new Big(0)
|
||||
).toString();
|
||||
|
||||
@ -357,6 +352,7 @@ export async function reconstructPortfolioHistory() {
|
||||
quantity: transactions.quantity,
|
||||
price: transactions.price,
|
||||
fee: transactions.fee,
|
||||
exchangeRate: transactions.exchangeRate,
|
||||
})
|
||||
.from(transactions)
|
||||
.where(lte(transactions.executedAt, currentDate))
|
||||
@ -399,7 +395,30 @@ export async function reconstructPortfolioHistory() {
|
||||
const metrics = calculateAssetMetrics(assetTxs, priceStrForMetrics);
|
||||
|
||||
const posValueCny = new Big(metrics.marketValue).times(snapshotFxRate);
|
||||
const posCostCny = new Big(metrics.accumulatedCost || '0');
|
||||
|
||||
// 使用交易时的真实汇率计算法币本金,而非直接用 metrics.accumulatedCost
|
||||
let calculatedFiatCost = new Big(0);
|
||||
const rawTxs = historicalTx.filter(t => t.assetId === assetId && (t.txType === 'BUY' || t.txType === 'SELL' || t.txType === 'DIVIDEND'));
|
||||
let currentQty = new Big(0);
|
||||
for (const tx of rawTxs) {
|
||||
const qty = new Big(tx.quantity);
|
||||
const fx = new Big(tx.exchangeRate || '1');
|
||||
const price = new Big(tx.price);
|
||||
|
||||
if (tx.txType === 'BUY') {
|
||||
currentQty = currentQty.plus(qty);
|
||||
calculatedFiatCost = calculatedFiatCost.plus(qty.times(price).times(fx));
|
||||
} else if (tx.txType === 'SELL') {
|
||||
let avgFiatCostPerUnit = new Big(0);
|
||||
if (currentQty.gt(0)) {
|
||||
avgFiatCostPerUnit = calculatedFiatCost.div(currentQty);
|
||||
}
|
||||
calculatedFiatCost = calculatedFiatCost.minus(avgFiatCostPerUnit.times(qty));
|
||||
currentQty = currentQty.minus(qty);
|
||||
}
|
||||
}
|
||||
|
||||
const posCostCny = calculatedFiatCost.gt(0) ? calculatedFiatCost : new Big(0);
|
||||
|
||||
totalValueCny = totalValueCny.plus(posValueCny);
|
||||
totalCostCny = totalCostCny.plus(posCostCny);
|
||||
|
||||
@ -105,4 +105,6 @@ export const exchangeRatesHistory = pgTable("exchange_rates_history", {
|
||||
createdAt: timestamp("created_at", { withTimezone: true, mode: "date" })
|
||||
.defaultNow()
|
||||
.notNull(),
|
||||
});
|
||||
}, (table) => ({
|
||||
unq: unique("rate_time_unq").on(table.fromCurrency, table.toCurrency, table.fetchTime),
|
||||
}));
|
||||
|
||||
Loading…
Reference in New Issue
Block a user