From 623a284ba4bea9db2c4ce5404e3c20df20ee264b Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sat, 29 Apr 2023 10:12:50 +0200 Subject: [PATCH] Feature/add and update historical data in bulk (#1904) * Upsert historical data in bulk * Update changelog --- CHANGELOG.md | 4 ++ .../data-gathering.processor.ts | 48 +++++++++++-------- test/import/ok-vti-buy-long-history.json | 38 +++++++++++++++ 3 files changed, 69 insertions(+), 21 deletions(-) create mode 100644 test/import/ok-vti-buy-long-history.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f5406964..64b539402 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Extended the support of the impersonation mode for local development +### Changed + +- Improved the queue jobs implementation by adding / updating historical market data in bulk + ### Fixed - Improved the holdings table by showing the cash position also when the filter contains the accounts, so that we can see the total allocation for that account diff --git a/apps/api/src/services/data-gathering/data-gathering.processor.ts b/apps/api/src/services/data-gathering/data-gathering.processor.ts index 063666ab0..241ac18b0 100644 --- a/apps/api/src/services/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/data-gathering/data-gathering.processor.ts @@ -1,6 +1,5 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; -import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { DATA_GATHERING_QUEUE, GATHER_ASSET_PROFILE_PROCESS, @@ -11,6 +10,7 @@ import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { Process, Processor } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; import { Job } from 'bull'; import { format, @@ -32,7 +32,7 @@ export class DataGatheringProcessor { private readonly marketDataService: MarketDataService ) {} - @Process(GATHER_ASSET_PROFILE_PROCESS) + @Process({ concurrency: 1, name: GATHER_ASSET_PROFILE_PROCESS }) public async gatherAssetProfile(job: Job) { try { await this.dataGatheringService.gatherAssetProfiles([job.data]); @@ -46,18 +46,27 @@ export class DataGatheringProcessor { } } - @Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS) + @Process({ concurrency: 1, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS }) public async gatherHistoricalMarketData(job: Job) { try { const { dataSource, date, symbol } = job.data; + let currentDate = parseISO((date)); + + Logger.log( + `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( + currentDate, + DATE_FORMAT + )}`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); const historicalData = await this.dataProviderService.getHistoricalRaw( [{ dataSource, symbol }], - parseISO((date)), + currentDate, new Date() ); - let currentDate = parseISO((date)); + const data: Prisma.MarketDataUpdateInput[] = []; let lastMarketPrice: number; while ( @@ -83,21 +92,13 @@ export class DataGatheringProcessor { } if (lastMarketPrice) { - try { - await this.marketDataService.updateMarketData({ - data: { - marketPrice: lastMarketPrice, - state: 'CLOSE' - }, - where: { - dataSource_date_symbol: { - dataSource, - symbol, - date: getStartOfUtcDate(currentDate) - } - } - }); - } catch {} + data.push({ + dataSource, + symbol, + date: getStartOfUtcDate(currentDate), + marketPrice: lastMarketPrice, + state: 'CLOSE' + }); } // Count month one up for iteration @@ -111,8 +112,13 @@ export class DataGatheringProcessor { ); } + await this.marketDataService.updateMany({ data }); + Logger.log( - `Historical market data gathering has been completed for ${symbol} (${dataSource}).`, + `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( + currentDate, + DATE_FORMAT + )}`, `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` ); } catch (error) { diff --git a/test/import/ok-vti-buy-long-history.json b/test/import/ok-vti-buy-long-history.json new file mode 100644 index 000000000..e6020e405 --- /dev/null +++ b/test/import/ok-vti-buy-long-history.json @@ -0,0 +1,38 @@ +{ + "meta": { + "date": "2023-04-29T00:00:00.000Z", + "version": "dev" + }, + "activities": [ + { + "fee": 0, + "quantity": 10, + "type": "BUY", + "unitPrice": 65.31, + "currency": "USD", + "dataSource": "YAHOO", + "date": "2012-01-02T22:00:00.000Z", + "symbol": "VTI" + }, + { + "fee": 0, + "quantity": 10, + "type": "BUY", + "unitPrice": 65.40, + "currency": "USD", + "dataSource": "YAHOO", + "date": "2011-01-02T22:00:00.000Z", + "symbol": "VTI" + }, + { + "fee": 0, + "quantity": 10, + "type": "BUY", + "unitPrice": 57.05, + "currency": "USD", + "dataSource": "YAHOO", + "date": "2010-01-03T22:00:00.000Z", + "symbol": "VTI" + } + ] +}