@ -1,6 +1,5 @@
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service' ;
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service' ;
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces' ;
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces' ;
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service' ;
import {
import {
DATA_GATHERING_QUEUE ,
DATA_GATHERING_QUEUE ,
GATHER_ASSET_PROFILE_PROCESS ,
GATHER_ASSET_PROFILE_PROCESS ,
@ -11,6 +10,7 @@ import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service' ;
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service' ;
import { Process , Processor } from '@nestjs/bull' ;
import { Process , Processor } from '@nestjs/bull' ;
import { Injectable , Logger } from '@nestjs/common' ;
import { Injectable , Logger } from '@nestjs/common' ;
import { Prisma } from '@prisma/client' ;
import { Job } from 'bull' ;
import { Job } from 'bull' ;
import {
import {
format ,
format ,
@ -32,7 +32,7 @@ export class DataGatheringProcessor {
private readonly marketDataService : MarketDataService
private readonly marketDataService : MarketDataService
) { }
) { }
@Process ( GATHER_ASSET_PROFILE_PROCESS)
@Process ( { concurrency: 1 , name : GATHER_ASSET_PROFILE_PROCESS } )
public async gatherAssetProfile ( job : Job < UniqueAsset > ) {
public async gatherAssetProfile ( job : Job < UniqueAsset > ) {
try {
try {
await this . dataGatheringService . gatherAssetProfiles ( [ job . data ] ) ;
await this . dataGatheringService . gatherAssetProfiles ( [ job . data ] ) ;
@ -46,18 +46,27 @@ export class DataGatheringProcessor {
}
}
}
}
@Process ( GATHER_HISTORICAL_MARKET_DATA_PROCESS)
@Process ( { concurrency: 1 , name : GATHER_HISTORICAL_MARKET_DATA_PROCESS } )
public async gatherHistoricalMarketData ( job : Job < IDataGatheringItem > ) {
public async gatherHistoricalMarketData ( job : Job < IDataGatheringItem > ) {
try {
try {
const { dataSource , date , symbol } = job . data ;
const { dataSource , date , symbol } = job . data ;
let currentDate = parseISO ( < string > ( < unknown > date ) ) ;
Logger . log (
` Historical market data gathering has been started for ${ symbol } ( ${ dataSource } ) at ${ format (
currentDate ,
DATE_FORMAT
) } ` ,
` DataGatheringProcessor ( ${ GATHER_HISTORICAL_MARKET_DATA_PROCESS } ) `
) ;
const historicalData = await this . dataProviderService . getHistoricalRaw (
const historicalData = await this . dataProviderService . getHistoricalRaw (
[ { dataSource , symbol } ] ,
[ { dataSource , symbol } ] ,
parseISO ( < string > ( < unknown > date ) ) ,
currentDate ,
new Date ( )
new Date ( )
) ;
) ;
let currentDate = parseISO ( < string > ( < unknown > date ) ) ;
const data : Prisma.MarketDataUpdateInput [ ] = [ ] ;
let lastMarketPrice : number ;
let lastMarketPrice : number ;
while (
while (
@ -83,21 +92,13 @@ export class DataGatheringProcessor {
}
}
if ( lastMarketPrice ) {
if ( lastMarketPrice ) {
try {
data . push ( {
await this . marketDataService . updateMarketData ( {
dataSource ,
data : {
symbol ,
marketPrice : lastMarketPrice ,
date : getStartOfUtcDate ( currentDate ) ,
state : 'CLOSE'
marketPrice : lastMarketPrice ,
} ,
state : 'CLOSE'
where : {
} ) ;
dataSource_date_symbol : {
dataSource ,
symbol ,
date : getStartOfUtcDate ( currentDate )
}
}
} ) ;
} catch { }
}
}
// Count month one up for iteration
// Count month one up for iteration
@ -111,8 +112,13 @@ export class DataGatheringProcessor {
) ;
) ;
}
}
await this . marketDataService . updateMany ( { data } ) ;
Logger . log (
Logger . log (
` Historical market data gathering has been completed for ${ symbol } ( ${ dataSource } ). ` ,
` Historical market data gathering has been completed for ${ symbol } ( ${ dataSource } ) at ${ format (
currentDate ,
DATE_FORMAT
) } ` ,
` DataGatheringProcessor ( ${ GATHER_HISTORICAL_MARKET_DATA_PROCESS } ) `
` DataGatheringProcessor ( ${ GATHER_HISTORICAL_MARKET_DATA_PROCESS } ) `
) ;
) ;
} catch ( error ) {
} catch ( error ) {