fix(api): 重构多市场日期解析并引入 upsert,彻底解决日期错位与重复写入问题

This commit is contained in:
kennethcheng 2026-05-02 13:38:01 +08:00
parent 7ded5b7837
commit c243ba4f35
3 changed files with 60 additions and 52 deletions

View File

@ -1,5 +1,12 @@
# Omniledger 架构与开发记忆 (Memory)
## 建立 asset_id 与 date 的联合唯一索引,重构三套跨市场日期解析正则,实现基于 onConflictDoUpdate 的价格历史幂等覆盖逻辑 (Task 65)
- 在 `src/db/schema.ts``assetPricesHistory` 表中新增 `updateTime` 字段 (`timestamp('update_time').defaultNow()`),并将联合索引从 `uniqueIndex` 升级为 `unique()` 约束 (`unique().on(table.assetId, table.date)`),确保 `(assetId, date)` 在物理层严格唯一。
- 在 `app/api/cron/fetch-prices/route.ts` 中一字不差地注入 `parseMarketDate(rawString: string)` 傻瓜式日期解析引擎:从腾讯财经 gtimg 原始响应的 Index 30 提取日期,支持 A股 (14位数字→YYYY-MM-DD)、港股 (斜杠分隔→YYYY-MM-DD)、美股 (横杠分隔→YYYY-MM-DD) 三种格式,含 try/catch 极端兜底。
- 废弃原有的 `SELECT → UPDATE/INSERT` 双步查询逻辑,全面替换为 Drizzle 的 `onConflictDoUpdate` UPSERT 语法:基于联合唯一约束,冲突时只更新 `price``updateTime`,实现完全幂等的价格覆盖。
- 移除不再使用的 `skippedCount` 计数器与 `eq` 导入,简化响应结构。
- 执行 `drizzle-kit push` 完成物理迁移,`curl` 测试确认:美股存入 `2026-05-01`港A股存入 `2026-04-30`;重复调用不产生新行,`update_time` 正确刷新。
## 升级时光机历史快照生成逻辑,引入就近汇率匹配策略 (Closest Rate Matching),消除因使用单一日结汇率导致的历史资产估值失真 (Task 64)
- 在 `src/actions/snapshots.ts``reconstructPortfolioHistory()` 中,废弃从静态 `exchangeRates` 表获取当前汇率的旧逻辑,全面接入 `exchangeRatesHistory` 历史汇率时间序列表。
- **架构调整**:在 `dayLoop` 循环之前,一次性加载全部 `exchangeRatesHistory` 记录到内存,按 `(fromCurrency, toCurrency)` 键分组构建 `ratesCache``Map<string, RateRecord[]>`),每条记录已按 `fetchTime` 升序排列。

View File

@ -1,7 +1,7 @@
import { NextResponse } from 'next/server';
import { db } from '@/db';
import { assets, assetPricesHistory } from '@/db/schema';
import { eq, inArray } from 'drizzle-orm';
import { inArray } from 'drizzle-orm';
import { ProxyAgent, setGlobalDispatcher } from 'undici';
export const dynamic = 'force-dynamic';
@ -14,21 +14,36 @@ function formatDateStr(date: Date): string {
return `${year}-${month}-${day}`;
}
function parseMarketDate(rawString: string): string {
const parts = rawString.split('~');
const rawDate = parts[30];
if (!rawDate) return new Date().toISOString().split('T')[0];
if (/^\d{14}$/.test(rawDate)) {
return `${rawDate.slice(0, 4)}-${rawDate.slice(4, 6)}-${rawDate.slice(6, 8)}`;
}
if (rawDate.includes('/')) {
return rawDate.split(' ')[0].replace(/\//g, '-');
}
return rawDate.split(' ')[0];
}
function parseMarketDate(rawString: string): string {
try {
const parts = rawString.split('~');
const rawDate = parts[30];
if (!rawDate) throw new Error("Missing date part");
// 1. A股 (20260430161416) -> 截取前8位并拼接
if (/^\d{14}$/.test(rawDate)) {
return `${rawDate.slice(0, 4)}-${rawDate.slice(4, 6)}-${rawDate.slice(6, 8)}`;
}
// 2. 港股 (2026/04/30 16:08:24) -> 截取日期并替换斜杠
if (rawDate.includes('/')) {
return rawDate.split(' ')[0].replace(/\//g, '-');
}
// 3. 美股 (2026-05-01 09:31:00) -> 直接截取日期部分
if (rawDate.includes('-')) {
return rawDate.split(' ')[0];
}
throw new Error(`Unrecognized date format: ${rawDate}`);
} catch (e) {
console.warn("Date parse fallback triggered: ", e);
// 极端兜底,实际不应该走到这里
const today = new Date();
return `${today.getFullYear()}-${String(today.getMonth() + 1).padStart(2, '0')}-${String(today.getDate()).padStart(2, '0')}`;
}
}
async function fetchStockPrice(asset: { symbol: string; exchange: string | null }): Promise<{ price: string | null; rawResponse: string | null }> {
const cleanSymbol = asset.symbol.trim().toUpperCase().replace(/[^0-9A-Z.\-]/g, '');
@ -116,13 +131,11 @@ export async function GET(req: Request) {
message: 'No active assets to sync',
date: dateStr,
synced: 0,
skipped: 0,
failed: 0,
});
}
let syncedCount = 0;
let skippedCount = 0;
let failedCount = 0;
const results: Array<{ symbol: string; price: string | null; status: string }> = [];
@ -146,38 +159,25 @@ export async function GET(req: Request) {
continue;
}
const entryDate = asset.type === 'STOCK' && rawResponse ? parseMarketDate(rawResponse) : dateStr;
const parsedDate = asset.type === 'STOCK' && rawResponse ? parseMarketDate(rawResponse) : dateStr;
const existing = await db
.select()
.from(assetPricesHistory)
.where(
eq(assetPricesHistory.assetId, asset.id)
)
.then((rows) =>
rows.filter((row) => row.date === entryDate)
);
await db.insert(assetPricesHistory)
.values({
assetId: asset.id,
date: parsedDate,
price: price.toString(),
updateTime: new Date()
})
.onConflictDoUpdate({
target: [assetPricesHistory.assetId, assetPricesHistory.date],
set: {
price: price.toString(),
updateTime: new Date()
}
});
if (existing.length > 0) {
await db
.update(assetPricesHistory)
.set({ price })
.where(
eq(assetPricesHistory.id, existing[0].id)
);
skippedCount++;
results.push({ symbol: asset.symbol, price, status: 'updated' });
} else {
await db
.insert(assetPricesHistory)
.values({
assetId: asset.id,
price,
date: entryDate,
});
syncedCount++;
results.push({ symbol: asset.symbol, price, status: 'inserted' });
}
syncedCount++;
results.push({ symbol: asset.symbol, price, status: 'upserted' });
} catch (error) {
failedCount++;
results.push({ symbol: asset.symbol, price: null, status: 'error' });
@ -187,9 +187,8 @@ export async function GET(req: Request) {
return NextResponse.json({
success: true,
date: dateStr,
date: dateStr,
synced: syncedCount,
skipped: skippedCount,
failed: failedCount,
details: results,
});

View File

@ -1,4 +1,4 @@
import { pgTable, uuid, varchar, timestamp, pgEnum, numeric, uniqueIndex, date } from "drizzle-orm/pg-core";
import { pgTable, uuid, varchar, timestamp, pgEnum, numeric, uniqueIndex, unique, date } from "drizzle-orm/pg-core";
export const users = pgTable("users", {
id: uuid("id").primaryKey().$defaultFn(() => crypto.randomUUID()),
@ -86,11 +86,13 @@ export const assetPricesHistory = pgTable("asset_prices_history", {
.references(() => assets.id),
price: numeric("price", { precision: 36, scale: 18 }).notNull(),
date: date("date", { mode: "string" }).notNull(),
updateTime: timestamp("update_time", { withTimezone: true, mode: "date" })
.defaultNow(),
createdAt: timestamp("created_at", { withTimezone: true, mode: "date" })
.defaultNow()
.notNull(),
}, (table) => [
uniqueIndex("asset_price_date_idx").on(table.assetId, table.date),
unique().on(table.assetId, table.date),
]);
export const exchangeRatesHistory = pgTable("exchange_rates_history", {