Feature/migrate historical market data gathering to queue design pattern (#991)

* Migrate historical market data gathering to queue

* Filter and delete jobs

* Detect duplicate jobs

* Update changelog
pull/1000/head
Thomas Kaul 2 years ago committed by GitHub
parent 2abe399ebd
commit 557e3a0676
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Migrated the historical market data gathering to the queue design pattern
- Extended the queue jobs view in the admin control panel by the number of attempts and the status
- Refreshed the cryptocurrencies list to support more coins by default - Refreshed the cryptocurrencies list to support more coins by default
- Increased the historical data chart of the _Fear & Greed Index_ (market mood) to 180 days - Increased the historical data chart of the _Fear & Greed Index_ (market mood) to 180 days
- Upgraded `chart.js` from version `3.7.0` to `3.8.0` - Upgraded `chart.js` from version `3.7.0` to `3.8.0`

@ -2,8 +2,8 @@ import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.se
import { MarketDataService } from '@ghostfolio/api/services/market-data.service'; import { MarketDataService } from '@ghostfolio/api/services/market-data.service';
import { PropertyDto } from '@ghostfolio/api/services/property/property.dto'; import { PropertyDto } from '@ghostfolio/api/services/property/property.dto';
import { import {
DATA_GATHERING_QUEUE, GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { import {
AdminData, AdminData,
@ -12,7 +12,6 @@ import {
} from '@ghostfolio/common/interfaces'; } from '@ghostfolio/common/interfaces';
import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import type { RequestWithUser } from '@ghostfolio/common/types'; import type { RequestWithUser } from '@ghostfolio/common/types';
import { InjectQueue } from '@nestjs/bull';
import { import {
Body, Body,
Controller, Controller,
@ -28,7 +27,6 @@ import {
import { REQUEST } from '@nestjs/core'; import { REQUEST } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { DataSource, MarketData } from '@prisma/client'; import { DataSource, MarketData } from '@prisma/client';
import { Queue } from 'bull';
import { isDate } from 'date-fns'; import { isDate } from 'date-fns';
import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import { StatusCodes, getReasonPhrase } from 'http-status-codes';
@ -39,8 +37,6 @@ import { UpdateMarketDataDto } from './update-market-data.dto';
export class AdminController { export class AdminController {
public constructor( public constructor(
private readonly adminService: AdminService, private readonly adminService: AdminService,
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataGatheringService: DataGatheringService, private readonly dataGatheringService: DataGatheringService,
private readonly marketDataService: MarketDataService, private readonly marketDataService: MarketDataService,
@Inject(REQUEST) private readonly request: RequestWithUser @Inject(REQUEST) private readonly request: RequestWithUser
@ -64,6 +60,24 @@ export class AdminController {
return this.adminService.get(); return this.adminService.get();
} }
@Post('gather')
@UseGuards(AuthGuard('jwt'))
public async gather7Days(): Promise<void> {
if (
!hasPermission(
this.request.user.permissions,
permissions.accessAdminControl
)
) {
throw new HttpException(
getReasonPhrase(StatusCodes.FORBIDDEN),
StatusCodes.FORBIDDEN
);
}
this.dataGatheringService.gather7Days();
}
@Post('gather/max') @Post('gather/max')
@UseGuards(AuthGuard('jwt')) @UseGuards(AuthGuard('jwt'))
public async gatherMax(): Promise<void> { public async gatherMax(): Promise<void> {
@ -82,10 +96,14 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) { for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { await this.dataGatheringService.addJobToQueue(
dataSource, GATHER_ASSET_PROFILE_PROCESS,
symbol {
}); dataSource,
symbol
},
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
} }
this.dataGatheringService.gatherMax(); this.dataGatheringService.gatherMax();
@ -109,10 +127,14 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) { for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { await this.dataGatheringService.addJobToQueue(
dataSource, GATHER_ASSET_PROFILE_PROCESS,
symbol {
}); dataSource,
symbol
},
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
} }
} }
@ -134,10 +156,14 @@ export class AdminController {
); );
} }
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { await this.dataGatheringService.addJobToQueue(
dataSource, GATHER_ASSET_PROFILE_PROCESS,
symbol {
}); dataSource,
symbol
},
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
} }
@Post('gather/:dataSource/:symbol') @Post('gather/:dataSource/:symbol')

@ -42,8 +42,6 @@ export class AdminService {
public async get(): Promise<AdminData> { public async get(): Promise<AdminData> {
return { return {
dataGatheringProgress:
await this.dataGatheringService.getDataGatheringProgress(),
exchangeRates: this.exchangeRateDataService exchangeRates: this.exchangeRateDataService
.getCurrencies() .getCurrencies()
.filter((currency) => { .filter((currency) => {
@ -60,7 +58,6 @@ export class AdminService {
) )
}; };
}), }),
lastDataGathering: await this.getLastDataGathering(),
settings: await this.propertyService.get(), settings: await this.propertyService.get(),
transactionCount: await this.prismaService.order.count(), transactionCount: await this.prismaService.order.count(),
userCount: await this.prismaService.user.count(), userCount: await this.prismaService.user.count(),
@ -161,30 +158,11 @@ export class AdminService {
if (key === PROPERTY_CURRENCIES) { if (key === PROPERTY_CURRENCIES) {
await this.exchangeRateDataService.initialize(); await this.exchangeRateDataService.initialize();
await this.dataGatheringService.reset();
} }
return response; return response;
} }
private async getLastDataGathering() {
const lastDataGathering =
await this.dataGatheringService.getLastDataGathering();
if (lastDataGathering) {
return lastDataGathering;
}
const dataGatheringInProgress =
await this.dataGatheringService.getIsInProgress();
if (dataGatheringInProgress) {
return 'IN_PROGRESS';
}
return undefined;
}
private async getUsersWithAnalytics(): Promise<AdminData['users']> { private async getUsersWithAnalytics(): Promise<AdminData['users']> {
const usersWithAnalytics = await this.prismaService.user.findMany({ const usersWithAnalytics = await this.prismaService.user.findMany({
orderBy: { orderBy: {

@ -3,13 +3,17 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import type { RequestWithUser } from '@ghostfolio/common/types'; import type { RequestWithUser } from '@ghostfolio/common/types';
import { import {
Controller, Controller,
Delete,
Get, Get,
HttpException, HttpException,
Inject, Inject,
Param,
Query,
UseGuards UseGuards
} from '@nestjs/common'; } from '@nestjs/common';
import { REQUEST } from '@nestjs/core'; import { REQUEST } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { JobStatus } from 'bull';
import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import { StatusCodes, getReasonPhrase } from 'http-status-codes';
import { QueueService } from './queue.service'; import { QueueService } from './queue.service';
@ -21,9 +25,11 @@ export class QueueController {
@Inject(REQUEST) private readonly request: RequestWithUser @Inject(REQUEST) private readonly request: RequestWithUser
) {} ) {}
@Get('jobs') @Delete('job')
@UseGuards(AuthGuard('jwt')) @UseGuards(AuthGuard('jwt'))
public async getJobs(): Promise<AdminJobs> { public async deleteJobs(
@Query('status') filterByStatus?: string
): Promise<void> {
if ( if (
!hasPermission( !hasPermission(
this.request.user.permissions, this.request.user.permissions,
@ -36,6 +42,46 @@ export class QueueController {
); );
} }
return this.queueService.getJobs({}); const status = <JobStatus[]>filterByStatus?.split(',') ?? undefined;
return this.queueService.deleteJobs({ status });
}
@Get('job')
@UseGuards(AuthGuard('jwt'))
public async getJobs(
@Query('status') filterByStatus?: string
): Promise<AdminJobs> {
if (
!hasPermission(
this.request.user.permissions,
permissions.accessAdminControl
)
) {
throw new HttpException(
getReasonPhrase(StatusCodes.FORBIDDEN),
StatusCodes.FORBIDDEN
);
}
const status = <JobStatus[]>filterByStatus?.split(',') ?? undefined;
return this.queueService.getJobs({ status });
}
@Delete('job/:id')
@UseGuards(AuthGuard('jwt'))
public async deleteJob(@Param('id') id: string): Promise<void> {
if (
!hasPermission(
this.request.user.permissions,
permissions.accessAdminControl
)
) {
throw new HttpException(
getReasonPhrase(StatusCodes.FORBIDDEN),
StatusCodes.FORBIDDEN
);
}
return this.queueService.deleteJob(id);
} }
} }

@ -1,8 +1,11 @@
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import {
DATA_GATHERING_QUEUE,
QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Queue } from 'bull'; import { JobStatus, Queue } from 'bull';
@Injectable() @Injectable()
export class QueueService { export class QueueService {
@ -11,22 +14,52 @@ export class QueueService {
private readonly dataGatheringQueue: Queue private readonly dataGatheringQueue: Queue
) {} ) {}
public async deleteJob(aId: string) {
return (await this.dataGatheringQueue.getJob(aId))?.remove();
}
public async deleteJobs({
status = QUEUE_JOB_STATUS_LIST
}: {
status?: JobStatus[];
}) {
const jobs = await this.dataGatheringQueue.getJobs(status);
for (const job of jobs) {
try {
await job.remove();
} catch (error) {
Logger.warn(error, 'QueueService');
}
}
}
public async getJobs({ public async getJobs({
limit = 1000 limit = 1000,
status = QUEUE_JOB_STATUS_LIST
}: { }: {
limit?: number; limit?: number;
status?: JobStatus[];
}): Promise<AdminJobs> { }): Promise<AdminJobs> {
const jobs = await this.dataGatheringQueue.getJobs([ const jobs = await this.dataGatheringQueue.getJobs(status);
'active',
'completed', const jobsWithState = await Promise.all(
'delayed', jobs.slice(0, limit).map(async (job) => {
'failed', return {
'paused', attemptsMade: job.attemptsMade + 1,
'waiting' data: job.data,
]); finishedOn: job.finishedOn,
id: job.id,
name: job.name,
stacktrace: job.stacktrace,
state: await job.getState(),
timestamp: job.timestamp
};
})
);
return { return {
jobs: jobs.slice(0, limit) jobs: jobsWithState
}; };
} }
} }

@ -1,21 +1,6 @@
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service';
import { Controller } from '@nestjs/common'; import { Controller } from '@nestjs/common';
@Controller() @Controller()
export class AppController { export class AppController {
public constructor( public constructor() {}
private readonly dataGatheringService: DataGatheringService
) {
this.initialize();
}
private async initialize() {
const isDataGatheringInProgress =
await this.dataGatheringService.getIsInProgress();
if (isDataGatheringInProgress) {
// Prepare for automatical data gathering, if hung up in progress state
await this.dataGatheringService.reset();
}
}
} }

@ -1,4 +1,3 @@
import { CacheService } from '@ghostfolio/api/app/cache/cache.service';
import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service'; import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service';
import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import type { RequestWithUser } from '@ghostfolio/common/types'; import type { RequestWithUser } from '@ghostfolio/common/types';
@ -16,7 +15,6 @@ import { StatusCodes, getReasonPhrase } from 'http-status-codes';
@Controller('cache') @Controller('cache')
export class CacheController { export class CacheController {
public constructor( public constructor(
private readonly cacheService: CacheService,
private readonly redisCacheService: RedisCacheService, private readonly redisCacheService: RedisCacheService,
@Inject(REQUEST) private readonly request: RequestWithUser @Inject(REQUEST) private readonly request: RequestWithUser
) {} ) {}
@ -36,8 +34,6 @@ export class CacheController {
); );
} }
this.redisCacheService.reset(); return this.redisCacheService.reset();
return this.cacheService.flush();
} }
} }

@ -1,4 +1,3 @@
import { CacheService } from '@ghostfolio/api/app/cache/cache.service';
import { RedisCacheModule } from '@ghostfolio/api/app/redis-cache/redis-cache.module'; import { RedisCacheModule } from '@ghostfolio/api/app/redis-cache/redis-cache.module';
import { ConfigurationModule } from '@ghostfolio/api/services/configuration.module'; import { ConfigurationModule } from '@ghostfolio/api/services/configuration.module';
import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering.module'; import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering.module';
@ -11,7 +10,6 @@ import { Module } from '@nestjs/common';
import { CacheController } from './cache.controller'; import { CacheController } from './cache.controller';
@Module({ @Module({
exports: [CacheService],
controllers: [CacheController], controllers: [CacheController],
imports: [ imports: [
ConfigurationModule, ConfigurationModule,
@ -21,7 +19,6 @@ import { CacheController } from './cache.controller';
PrismaModule, PrismaModule,
RedisCacheModule, RedisCacheModule,
SymbolProfileModule SymbolProfileModule
], ]
providers: [CacheService]
}) })
export class CacheModule {} export class CacheModule {}

@ -1,15 +0,0 @@
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service';
import { Injectable } from '@nestjs/common';
@Injectable()
export class CacheService {
public constructor(
private readonly dataGaterhingService: DataGatheringService
) {}
public async flush(): Promise<void> {
await this.dataGaterhingService.reset();
return;
}
}

@ -106,7 +106,6 @@ export class InfoService {
baseCurrency: this.configurationService.get('BASE_CURRENCY'), baseCurrency: this.configurationService.get('BASE_CURRENCY'),
currencies: this.exchangeRateDataService.getCurrencies(), currencies: this.exchangeRateDataService.getCurrencies(),
demoAuthToken: this.getDemoAuthToken(), demoAuthToken: this.getDemoAuthToken(),
lastDataGathering: await this.getLastDataGathering(),
statistics: await this.getStatistics(), statistics: await this.getStatistics(),
subscriptions: await this.getSubscriptions(), subscriptions: await this.getSubscriptions(),
tags: await this.tagService.get() tags: await this.tagService.get()
@ -215,13 +214,6 @@ export class InfoService {
}); });
} }
private async getLastDataGathering() {
const lastDataGathering =
await this.dataGatheringService.getLastDataGathering();
return lastDataGathering ?? null;
}
private async getStatistics() { private async getStatistics() {
if (!this.configurationService.get('ENABLE_FEATURE_STATISTICS')) { if (!this.configurationService.get('ENABLE_FEATURE_STATISTICS')) {
return undefined; return undefined;

@ -1,16 +1,14 @@
import { AccountService } from '@ghostfolio/api/app/account/account.service'; import { AccountService } from '@ghostfolio/api/app/account/account.service';
import { CacheService } from '@ghostfolio/api/app/cache/cache.service';
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service'; import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data.service';
import { PrismaService } from '@ghostfolio/api/services/prisma.service'; import { PrismaService } from '@ghostfolio/api/services/prisma.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import { import {
DATA_GATHERING_QUEUE, GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { Filter } from '@ghostfolio/common/interfaces'; import { Filter } from '@ghostfolio/common/interfaces';
import { OrderWithAccount } from '@ghostfolio/common/types'; import { OrderWithAccount } from '@ghostfolio/common/types';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { import {
AssetClass, AssetClass,
@ -21,7 +19,6 @@ import {
Type as TypeOfOrder Type as TypeOfOrder
} from '@prisma/client'; } from '@prisma/client';
import Big from 'big.js'; import Big from 'big.js';
import { Queue } from 'bull';
import { endOfToday, isAfter } from 'date-fns'; import { endOfToday, isAfter } from 'date-fns';
import { groupBy } from 'lodash'; import { groupBy } from 'lodash';
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
@ -32,11 +29,8 @@ import { Activity } from './interfaces/activities.interface';
export class OrderService { export class OrderService {
public constructor( public constructor(
private readonly accountService: AccountService, private readonly accountService: AccountService,
private readonly cacheService: CacheService,
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly dataGatheringService: DataGatheringService, private readonly dataGatheringService: DataGatheringService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly prismaService: PrismaService, private readonly prismaService: PrismaService,
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
@ -120,10 +114,14 @@ export class OrderService {
data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase(); data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase();
} }
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { await this.dataGatheringService.addJobToQueue(
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, GATHER_ASSET_PROFILE_PROCESS,
symbol: data.SymbolProfile.connectOrCreate.create.symbol {
}); dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol
},
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
const isDraft = isAfter(data.date as Date, endOfToday()); const isDraft = isAfter(data.date as Date, endOfToday());
@ -138,8 +136,6 @@ export class OrderService {
]); ]);
} }
await this.cacheService.flush();
delete data.accountId; delete data.accountId;
delete data.assetClass; delete data.assetClass;
delete data.assetSubClass; delete data.assetSubClass;
@ -330,8 +326,6 @@ export class OrderService {
} }
} }
await this.cacheService.flush();
delete data.assetClass; delete data.assetClass;
delete data.assetSubClass; delete data.assetSubClass;
delete data.currency; delete data.currency;

@ -1,11 +1,9 @@
import { import {
DATA_GATHERING_QUEUE, GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import { Queue } from 'bull';
import { DataGatheringService } from './data-gathering.service'; import { DataGatheringService } from './data-gathering.service';
import { ExchangeRateDataService } from './exchange-rate-data.service'; import { ExchangeRateDataService } from './exchange-rate-data.service';
@ -14,15 +12,13 @@ import { TwitterBotService } from './twitter-bot/twitter-bot.service';
@Injectable() @Injectable()
export class CronService { export class CronService {
public constructor( public constructor(
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataGatheringService: DataGatheringService, private readonly dataGatheringService: DataGatheringService,
private readonly exchangeRateDataService: ExchangeRateDataService, private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly twitterBotService: TwitterBotService private readonly twitterBotService: TwitterBotService
) {} ) {}
@Cron(CronExpression.EVERY_MINUTE) @Cron(CronExpression.EVERY_HOUR)
public async runEveryMinute() { public async runEveryHour() {
await this.dataGatheringService.gather7Days(); await this.dataGatheringService.gather7Days();
} }
@ -41,10 +37,14 @@ export class CronService {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) { for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { await this.dataGatheringService.addJobToQueue(
dataSource, GATHER_ASSET_PROFILE_PROCESS,
symbol {
}); dataSource,
symbol
},
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
} }
} }
} }

@ -6,6 +6,7 @@ import { PrismaModule } from '@ghostfolio/api/services/prisma.module';
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import ms from 'ms';
import { DataGatheringProcessor } from './data-gathering.processor'; import { DataGatheringProcessor } from './data-gathering.processor';
import { ExchangeRateDataModule } from './exchange-rate-data.module'; import { ExchangeRateDataModule } from './exchange-rate-data.module';
@ -14,6 +15,10 @@ import { SymbolProfileModule } from './symbol-profile.module';
@Module({ @Module({
imports: [ imports: [
BullModule.registerQueue({ BullModule.registerQueue({
limiter: {
duration: ms('5 seconds'),
max: 1
},
name: DATA_GATHERING_QUEUE name: DATA_GATHERING_QUEUE
}), }),
ConfigurationModule, ConfigurationModule,

@ -1,19 +1,34 @@
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
GATHER_ASSET_PROFILE_PROCESS GATHER_ASSET_PROFILE_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { Process, Processor } from '@nestjs/bull'; import { Process, Processor } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bull'; import { Job } from 'bull';
import {
format,
getDate,
getMonth,
getYear,
isBefore,
parseISO
} from 'date-fns';
import { DataGatheringService } from './data-gathering.service'; import { DataGatheringService } from './data-gathering.service';
import { DataProviderService } from './data-provider/data-provider.service';
import { IDataGatheringItem } from './interfaces/interfaces';
import { PrismaService } from './prisma.service';
@Injectable() @Injectable()
@Processor(DATA_GATHERING_QUEUE) @Processor(DATA_GATHERING_QUEUE)
export class DataGatheringProcessor { export class DataGatheringProcessor {
public constructor( public constructor(
private readonly dataGatheringService: DataGatheringService private readonly dataGatheringService: DataGatheringService,
private readonly dataProviderService: DataProviderService,
private readonly prismaService: PrismaService
) {} ) {}
@Process(GATHER_ASSET_PROFILE_PROCESS) @Process(GATHER_ASSET_PROFILE_PROCESS)
@ -21,7 +36,93 @@ export class DataGatheringProcessor {
try { try {
await this.dataGatheringService.gatherAssetProfiles([job.data]); await this.dataGatheringService.gatherAssetProfiles([job.data]);
} catch (error) { } catch (error) {
Logger.error(error, 'DataGatheringProcessor'); Logger.error(
error,
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})`
);
throw new Error(error);
}
}
@Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS)
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) {
try {
const { dataSource, date, symbol } = job.data;
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
parseISO(<string>(<unknown>date)),
new Date()
);
let currentDate = parseISO(<string>(<unknown>date));
let lastMarketPrice: number;
while (
isBefore(
currentDate,
new Date(
Date.UTC(
getYear(new Date()),
getMonth(new Date()),
getDate(new Date()),
0
)
)
)
) {
if (
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice
) {
lastMarketPrice =
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice;
}
if (lastMarketPrice) {
try {
await this.prismaService.marketData.create({
data: {
dataSource,
symbol,
date: new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate),
0
)
),
marketPrice: lastMarketPrice
}
});
} catch {}
}
// Count month one up for iteration
currentDate = new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate) + 1,
0
)
);
}
Logger.log(
`Historical market data gathering has been completed for ${symbol} (${dataSource}).`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
} catch (error) {
Logger.error(
error,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
throw new Error(error);
} }
} }
} }

@ -1,21 +1,17 @@
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import { import {
PROPERTY_LAST_DATA_GATHERING, DATA_GATHERING_QUEUE,
PROPERTY_LOCKED_DATA_GATHERING GATHER_HISTORICAL_MARKET_DATA_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import { import { JobOptions, Queue } from 'bull';
differenceInHours, import { format, subDays } from 'date-fns';
format,
getDate,
getMonth,
getYear,
isBefore,
subDays
} from 'date-fns';
import { DataProviderService } from './data-provider/data-provider.service'; import { DataProviderService } from './data-provider/data-provider.service';
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface';
@ -25,167 +21,48 @@ import { PrismaService } from './prisma.service';
@Injectable() @Injectable()
export class DataGatheringService { export class DataGatheringService {
private dataGatheringProgress: number;
public constructor( public constructor(
@Inject('DataEnhancers') @Inject('DataEnhancers')
private readonly dataEnhancers: DataEnhancerInterface[], private readonly dataEnhancers: DataEnhancerInterface[],
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
private readonly exchangeRateDataService: ExchangeRateDataService, private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly prismaService: PrismaService, private readonly prismaService: PrismaService,
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
public async gather7Days() { public async addJobToQueue(name: string, data: any, options?: JobOptions) {
const isDataGatheringNeeded = await this.isDataGatheringNeeded(); const hasJob = await this.hasJob(name, data);
if (isDataGatheringNeeded) {
Logger.log('7d data gathering has been started.', 'DataGatheringService');
console.time('data-gathering-7d');
await this.prismaService.property.create({
data: {
key: PROPERTY_LOCKED_DATA_GATHERING,
value: new Date().toISOString()
}
});
const symbols = await this.getSymbols7D();
try {
await this.gatherSymbols(symbols);
await this.prismaService.property.upsert({
create: {
key: PROPERTY_LAST_DATA_GATHERING,
value: new Date().toISOString()
},
update: { value: new Date().toISOString() },
where: { key: PROPERTY_LAST_DATA_GATHERING }
});
} catch (error) {
Logger.error(error, 'DataGatheringService');
}
await this.prismaService.property.delete({
where: {
key: PROPERTY_LOCKED_DATA_GATHERING
}
});
if (hasJob) {
Logger.log( Logger.log(
'7d data gathering has been completed.', `Job ${name} with data ${JSON.stringify(data)} already exists.`,
'DataGatheringService' 'DataGatheringService'
); );
console.timeEnd('data-gathering-7d'); } else {
return this.dataGatheringQueue.add(name, data, options);
} }
} }
public async gatherMax() { public async gather7Days() {
const isDataGatheringLocked = await this.prismaService.property.findUnique({ const dataGatheringItems = await this.getSymbols7D();
where: { key: PROPERTY_LOCKED_DATA_GATHERING } await this.gatherSymbols(dataGatheringItems);
}); }
if (!isDataGatheringLocked) {
Logger.log(
'Max data gathering has been started.',
'DataGatheringService'
);
console.time('data-gathering-max');
await this.prismaService.property.create({
data: {
key: PROPERTY_LOCKED_DATA_GATHERING,
value: new Date().toISOString()
}
});
const symbols = await this.getSymbolsMax();
try {
await this.gatherSymbols(symbols);
await this.prismaService.property.upsert({
create: {
key: PROPERTY_LAST_DATA_GATHERING,
value: new Date().toISOString()
},
update: { value: new Date().toISOString() },
where: { key: PROPERTY_LAST_DATA_GATHERING }
});
} catch (error) {
Logger.error(error, 'DataGatheringService');
}
await this.prismaService.property.delete({
where: {
key: PROPERTY_LOCKED_DATA_GATHERING
}
});
Logger.log( public async gatherMax() {
'Max data gathering has been completed.', const dataGatheringItems = await this.getSymbolsMax();
'DataGatheringService' await this.gatherSymbols(dataGatheringItems);
);
console.timeEnd('data-gathering-max');
}
} }
public async gatherSymbol({ dataSource, symbol }: UniqueAsset) { public async gatherSymbol({ dataSource, symbol }: UniqueAsset) {
const isDataGatheringLocked = await this.prismaService.property.findUnique({ const symbols = (await this.getSymbolsMax()).filter((dataGatheringItem) => {
where: { key: PROPERTY_LOCKED_DATA_GATHERING } return (
}); dataGatheringItem.dataSource === dataSource &&
dataGatheringItem.symbol === symbol
if (!isDataGatheringLocked) {
Logger.log(
`Symbol data gathering for ${symbol} has been started.`,
'DataGatheringService'
);
console.time('data-gathering-symbol');
await this.prismaService.property.create({
data: {
key: PROPERTY_LOCKED_DATA_GATHERING,
value: new Date().toISOString()
}
});
const symbols = (await this.getSymbolsMax()).filter(
(dataGatheringItem) => {
return (
dataGatheringItem.dataSource === dataSource &&
dataGatheringItem.symbol === symbol
);
}
);
try {
await this.gatherSymbols(symbols);
await this.prismaService.property.upsert({
create: {
key: PROPERTY_LAST_DATA_GATHERING,
value: new Date().toISOString()
},
update: { value: new Date().toISOString() },
where: { key: PROPERTY_LAST_DATA_GATHERING }
});
} catch (error) {
Logger.error(error, 'DataGatheringService');
}
await this.prismaService.property.delete({
where: {
key: PROPERTY_LOCKED_DATA_GATHERING
}
});
Logger.log(
`Symbol data gathering for ${symbol} has been completed.`,
'DataGatheringService'
); );
console.timeEnd('data-gathering-symbol'); });
} await this.gatherSymbols(symbols);
} }
public async gatherSymbolForDate({ public async gatherSymbolForDate({
@ -235,15 +112,6 @@ export class DataGatheringService {
uniqueAssets = await this.getUniqueAssets(); uniqueAssets = await this.getUniqueAssets();
} }
Logger.log(
`Asset profile data gathering has been started for ${uniqueAssets
.map(({ dataSource, symbol }) => {
return `${symbol} (${dataSource})`;
})
.join(',')}.`,
'DataGatheringService'
);
const assetProfiles = await this.dataProviderService.getAssetProfiles( const assetProfiles = await this.dataProviderService.getAssetProfiles(
uniqueAssets uniqueAssets
); );
@ -334,136 +202,21 @@ export class DataGatheringService {
} }
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) {
let hasError = false;
let symbolCounter = 0;
for (const { dataSource, date, symbol } of aSymbolsWithStartDate) { for (const { dataSource, date, symbol } of aSymbolsWithStartDate) {
if (dataSource === 'MANUAL') { if (dataSource === 'MANUAL') {
continue; continue;
} }
this.dataGatheringProgress = symbolCounter / aSymbolsWithStartDate.length; await this.addJobToQueue(
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
try { {
const historicalData = await this.dataProviderService.getHistoricalRaw( dataSource,
[{ dataSource, symbol }],
date, date,
new Date() symbol
); },
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS
let currentDate = date; );
let lastMarketPrice: number;
while (
isBefore(
currentDate,
new Date(
Date.UTC(
getYear(new Date()),
getMonth(new Date()),
getDate(new Date()),
0
)
)
)
) {
if (
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice
) {
lastMarketPrice =
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice;
}
if (lastMarketPrice) {
try {
await this.prismaService.marketData.create({
data: {
dataSource,
symbol,
date: new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate),
0
)
),
marketPrice: lastMarketPrice
}
});
} catch {}
} else {
Logger.warn(
`Failed to gather data for symbol ${symbol} from ${dataSource} at ${format(
currentDate,
DATE_FORMAT
)}.`,
'DataGatheringService'
);
}
// Count month one up for iteration
currentDate = new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate) + 1,
0
)
);
}
} catch (error) {
hasError = true;
Logger.error(error, 'DataGatheringService');
}
if (symbolCounter > 0 && symbolCounter % 100 === 0) {
Logger.log(
`Data gathering progress: ${(
this.dataGatheringProgress * 100
).toFixed(2)}%`,
'DataGatheringService'
);
}
symbolCounter += 1;
}
await this.exchangeRateDataService.initialize();
if (hasError) {
throw '';
}
}
public async getDataGatheringProgress() {
const isInProgress = await this.getIsInProgress();
if (isInProgress) {
return this.dataGatheringProgress;
}
return undefined;
}
public async getIsInProgress() {
return await this.prismaService.property.findUnique({
where: { key: PROPERTY_LOCKED_DATA_GATHERING }
});
}
public async getLastDataGathering() {
const lastDataGathering = await this.prismaService.property.findUnique({
where: { key: PROPERTY_LAST_DATA_GATHERING }
});
if (lastDataGathering?.value) {
return new Date(lastDataGathering.value);
} }
return undefined;
} }
public async getSymbolsMax(): Promise<IDataGatheringItem[]> { public async getSymbolsMax(): Promise<IDataGatheringItem[]> {
@ -534,19 +287,6 @@ export class DataGatheringService {
}); });
} }
public async reset() {
Logger.log('Data gathering has been reset.', 'DataGatheringService');
await this.prismaService.property.deleteMany({
where: {
OR: [
{ key: PROPERTY_LAST_DATA_GATHERING },
{ key: PROPERTY_LOCKED_DATA_GATHERING }
]
}
});
}
private async getSymbols7D(): Promise<IDataGatheringItem[]> { private async getSymbols7D(): Promise<IDataGatheringItem[]> {
const startDate = subDays(resetHours(new Date()), 7); const startDate = subDays(resetHours(new Date()), 7);
@ -610,15 +350,17 @@ export class DataGatheringService {
return [...currencyPairsToGather, ...symbolProfilesToGather]; return [...currencyPairsToGather, ...symbolProfilesToGather];
} }
private async isDataGatheringNeeded() { private async hasJob(name: string, data: any) {
const lastDataGathering = await this.getLastDataGathering(); const jobs = await this.dataGatheringQueue.getJobs(
QUEUE_JOB_STATUS_LIST.filter((status) => {
return status !== 'completed';
})
);
const isDataGatheringLocked = await this.prismaService.property.findUnique({ return jobs.some((job) => {
where: { key: PROPERTY_LOCKED_DATA_GATHERING } return (
job.name === name && JSON.stringify(job.data) === JSON.stringify(data)
);
}); });
const diffInHours = differenceInHours(new Date(), lastDataGathering);
return (diffInHours >= 1 || !lastDataGathering) && !isDataGatheringLocked;
} }
} }

@ -9,7 +9,7 @@ import { DATE_FORMAT } from '@ghostfolio/common/helper';
import { Granularity } from '@ghostfolio/common/types'; import { Granularity } from '@ghostfolio/common/types';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { DataSource, SymbolProfile } from '@prisma/client'; import { DataSource, SymbolProfile } from '@prisma/client';
import { isAfter, isBefore, parse } from 'date-fns'; import { format, isAfter, isBefore, parse } from 'date-fns';
import { IAlphaVantageHistoricalResponse } from './interfaces/interfaces'; import { IAlphaVantageHistoricalResponse } from './interfaces/interfaces';
@ -76,9 +76,12 @@ export class AlphaVantageService implements DataProviderInterface {
return response; return response;
} catch (error) { } catch (error) {
Logger.error(error, 'AlphaVantageService'); throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
return {}; from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
} }
} }

@ -72,10 +72,13 @@ export class EodHistoricalDataService implements DataProviderInterface {
{ [aSymbol]: {} } { [aSymbol]: {} }
); );
} catch (error) { } catch (error) {
Logger.error(error, 'EodHistoricalDataService'); throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
} }
return {};
} }
public getName(): DataSource { public getName(): DataSource {

@ -87,10 +87,13 @@ export class GhostfolioScraperApiService implements DataProviderInterface {
} }
}; };
} catch (error) { } catch (error) {
Logger.error(error, 'GhostfolioScraperApiService'); throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
} }
return {};
} }
public getName(): DataSource { public getName(): DataSource {

@ -71,10 +71,13 @@ export class GoogleSheetsService implements DataProviderInterface {
[symbol]: historicalData [symbol]: historicalData
}; };
} catch (error) { } catch (error) {
Logger.error(error, 'GoogleSheetsService'); throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
} }
return {};
} }
public getName(): DataSource { public getName(): DataSource {

@ -90,7 +90,14 @@ export class RakutenRapidApiService implements DataProviderInterface {
} }
}; };
} }
} catch (error) {} } catch (error) {
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
return {}; return {};
} }

@ -131,7 +131,13 @@ export class YahooFinanceService implements DataProviderInterface {
if (url) { if (url) {
response.url = url; response.url = url;
} }
} catch {} } catch (error) {
throw new Error(
`Could not get asset profile for ${aSymbol} (${this.getName()}): [${
error.name
}] ${error.message}`
);
}
return response; return response;
} }
@ -185,12 +191,12 @@ export class YahooFinanceService implements DataProviderInterface {
return response; return response;
} catch (error) { } catch (error) {
Logger.warn( throw new Error(
`Skipping yahooFinance2.getHistorical("${aSymbol}"): [${error.name}] ${error.message}`, `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
'YahooFinanceService' from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
); );
return {};
} }
} }

@ -1,3 +1,4 @@
import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { MarketState } from '@ghostfolio/common/types'; import { MarketState } from '@ghostfolio/common/types';
import { import {
Account, Account,
@ -32,8 +33,6 @@ export interface IDataProviderResponse {
marketState: MarketState; marketState: MarketState;
} }
export interface IDataGatheringItem { export interface IDataGatheringItem extends UniqueAsset {
dataSource: DataSource;
date?: Date; date?: Date;
symbol: string;
} }

@ -5,10 +5,13 @@ import {
OnDestroy, OnDestroy,
OnInit OnInit
} from '@angular/core'; } from '@angular/core';
import { FormBuilder, FormGroup } from '@angular/forms';
import { AdminService } from '@ghostfolio/client/services/admin.service'; import { AdminService } from '@ghostfolio/client/services/admin.service';
import { UserService } from '@ghostfolio/client/services/user/user.service'; import { UserService } from '@ghostfolio/client/services/user/user.service';
import { QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config';
import { getDateWithTimeFormatString } from '@ghostfolio/common/helper'; import { getDateWithTimeFormatString } from '@ghostfolio/common/helper';
import { AdminJobs, User } from '@ghostfolio/common/interfaces'; import { AdminJobs, User } from '@ghostfolio/common/interfaces';
import { JobStatus } from 'bull';
import { Subject } from 'rxjs'; import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators'; import { takeUntil } from 'rxjs/operators';
@ -20,7 +23,9 @@ import { takeUntil } from 'rxjs/operators';
}) })
export class AdminJobsComponent implements OnDestroy, OnInit { export class AdminJobsComponent implements OnDestroy, OnInit {
public defaultDateTimeFormat: string; public defaultDateTimeFormat: string;
public filterForm: FormGroup;
public jobs: AdminJobs['jobs'] = []; public jobs: AdminJobs['jobs'] = [];
public statusFilterOptions = QUEUE_JOB_STATUS_LIST;
public user: User; public user: User;
private unsubscribeSubject = new Subject<void>(); private unsubscribeSubject = new Subject<void>();
@ -31,6 +36,7 @@ export class AdminJobsComponent implements OnDestroy, OnInit {
public constructor( public constructor(
private adminService: AdminService, private adminService: AdminService,
private changeDetectorRef: ChangeDetectorRef, private changeDetectorRef: ChangeDetectorRef,
private formBuilder: FormBuilder,
private userService: UserService private userService: UserService
) { ) {
this.userService.stateChanged this.userService.stateChanged
@ -50,9 +56,40 @@ export class AdminJobsComponent implements OnDestroy, OnInit {
* Initializes the controller * Initializes the controller
*/ */
public ngOnInit() { public ngOnInit() {
this.filterForm = this.formBuilder.group({
status: []
});
this.filterForm.valueChanges
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
const currentFilter = this.filterForm.get('status').value;
this.fetchJobs(currentFilter ? [currentFilter] : undefined);
});
this.fetchJobs(); this.fetchJobs();
} }
public onDeleteJob(aId: string) {
this.adminService
.deleteJob(aId)
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
this.fetchJobs();
});
}
public onDeleteJobs() {
const currentFilter = this.filterForm.get('status').value;
this.adminService
.deleteJobs({ status: currentFilter ? [currentFilter] : undefined })
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
this.fetchJobs(currentFilter ? [currentFilter] : undefined);
});
}
public onViewStacktrace(aStacktrace: AdminJobs['jobs'][0]['stacktrace']) { public onViewStacktrace(aStacktrace: AdminJobs['jobs'][0]['stacktrace']) {
alert(JSON.stringify(aStacktrace, null, ' ')); alert(JSON.stringify(aStacktrace, null, ' '));
} }
@ -62,9 +99,9 @@ export class AdminJobsComponent implements OnDestroy, OnInit {
this.unsubscribeSubject.complete(); this.unsubscribeSubject.complete();
} }
private fetchJobs() { private fetchJobs(aStatus?: JobStatus[]) {
this.adminService this.adminService
.fetchJobs() .fetchJobs({ status: aStatus })
.pipe(takeUntil(this.unsubscribeSubject)) .pipe(takeUntil(this.unsubscribeSubject))
.subscribe(({ jobs }) => { .subscribe(({ jobs }) => {
this.jobs = jobs; this.jobs = jobs;

@ -1,13 +1,34 @@
<div class="container"> <div class="container">
<div class="row"> <div class="row">
<div class="col"> <div class="col">
<form class="align-items-center d-flex" [formGroup]="filterForm">
<mat-form-field appearance="outline" class="flex-grow-1">
<mat-select formControlName="status">
<mat-option></mat-option>
<mat-option
*ngFor="let statusFilterOption of statusFilterOptions"
[value]="statusFilterOption"
>{{ statusFilterOption }}</mat-option
>
</mat-select>
</mat-form-field>
<button
class="ml-1"
color="warn"
mat-flat-button
(click)="onDeleteJobs()"
>
<span i18n>Delete Jobs</span>
</button>
</form>
<table class="gf-table w-100"> <table class="gf-table w-100">
<thead> <thead>
<tr class="mat-header-row"> <tr class="mat-header-row">
<th class="mat-header-cell px-1 py-2" i18n>#</th> <th class="mat-header-cell px-1 py-2 text-right" i18n>#</th>
<th class="mat-header-cell px-1 py-2" i18n>Type</th> <th class="mat-header-cell px-1 py-2" i18n>Type</th>
<th class="mat-header-cell px-1 py-2" i18n>Data Source</th>
<th class="mat-header-cell px-1 py-2" i18n>Symbol</th> <th class="mat-header-cell px-1 py-2" i18n>Symbol</th>
<th class="mat-header-cell px-1 py-2" i18n>Data Source</th>
<th class="mat-header-cell px-1 py-2 text-right" i18n>Attempts</th>
<th class="mat-header-cell px-1 py-2" i18n>Created</th> <th class="mat-header-cell px-1 py-2" i18n>Created</th>
<th class="mat-header-cell px-1 py-2" i18n>Finished</th> <th class="mat-header-cell px-1 py-2" i18n>Finished</th>
<th class="mat-header-cell px-1 py-2" i18n>Status</th> <th class="mat-header-cell px-1 py-2" i18n>Status</th>
@ -17,10 +38,28 @@
<tbody> <tbody>
<ng-container *ngFor="let job of jobs"> <ng-container *ngFor="let job of jobs">
<tr class="mat-row"> <tr class="mat-row">
<td class="mat-cell px-1 py-2">{{ job.id }}</td> <td class="mat-cell px-1 py-2 text-right">{{ job.id }}</td>
<td class="mat-cell px-1 py-2">{{ job.name }}</td> <td class="mat-cell px-1 py-2">
<td class="mat-cell px-1 py-2">{{ job.data?.dataSource }}</td> <span class="align-items-center d-flex">
<ion-icon
class="mr-1"
name="arrow-down-circle-outline"
></ion-icon>
<ng-container *ngIf="job.name === 'GATHER_ASSET_PROFILE'">
<span i18n>Asset Profile</span>
</ng-container>
<ng-container
*ngIf="job.name === 'GATHER_HISTORICAL_MARKET_DATA'"
>
<span i18n>Historical Market Data</span>
</ng-container>
</span>
</td>
<td class="mat-cell px-1 py-2">{{ job.data?.symbol }}</td> <td class="mat-cell px-1 py-2">{{ job.data?.symbol }}</td>
<td class="mat-cell px-1 py-2">{{ job.data?.dataSource }}</td>
<td class="mat-cell px-1 py-2 text-right">
{{ job.attemptsMade }}
</td>
<td class="mat-cell px-1 py-2"> <td class="mat-cell px-1 py-2">
{{ job.timestamp | date: defaultDateTimeFormat }} {{ job.timestamp | date: defaultDateTimeFormat }}
</td> </td>
@ -29,21 +68,32 @@
</td> </td>
<td class="mat-cell px-1 py-2"> <td class="mat-cell px-1 py-2">
<ion-icon <ion-icon
*ngIf="job.finishedOn" *ngIf="job.state === 'active'"
name="play-outline"
></ion-icon>
<ion-icon
*ngIf="job.state === 'completed'"
class="text-success" class="text-success"
name="checkmark-circle-outline" name="checkmark-circle-outline"
></ion-icon> ></ion-icon>
<ng-container *ngIf="!job.finishedOn"> <ion-icon
<ion-icon *ngIf="job.state === 'delayed'"
*ngIf="job.stacktrace?.length >= 1" name="time-outline"
class="text-danger" [ngClass]="{ 'text-danger': job.stacktrace?.length > 0 }"
name="alert-circle-outline" ></ion-icon>
></ion-icon> <ion-icon
<ion-icon *ngIf="job.state === 'failed'"
*ngIf="job.stacktrace?.length < 1" class="text-danger"
name="time-outline" name="alert-circle-outline"
></ion-icon> ></ion-icon>
</ng-container> <ion-icon
*ngIf="job.state === 'paused'"
name="pause-outline"
></ion-icon>
<ion-icon
*ngIf="job.state === 'waiting'"
name="cafe-outline"
></ion-icon>
</td> </td>
<td class="mat-cell px-1 py-2"> <td class="mat-cell px-1 py-2">
<button <button
@ -58,11 +108,14 @@
<button <button
i18n i18n
mat-menu-item mat-menu-item
[disabled]="job.stacktrace?.length < 1" [disabled]="job.stacktrace?.length <= 0"
(click)="onViewStacktrace(job.stacktrace)" (click)="onViewStacktrace(job.stacktrace)"
> >
View Stacktrace View Stacktrace
</button> </button>
<button i18n mat-menu-item (click)="onDeleteJob(job.id)">
Delete Job
</button>
</mat-menu> </mat-menu>
</td> </td>
</tr> </tr>

@ -1,13 +1,22 @@
import { CommonModule } from '@angular/common'; import { CommonModule } from '@angular/common';
import { CUSTOM_ELEMENTS_SCHEMA, NgModule } from '@angular/core'; import { CUSTOM_ELEMENTS_SCHEMA, NgModule } from '@angular/core';
import { FormsModule, ReactiveFormsModule } from '@angular/forms';
import { MatButtonModule } from '@angular/material/button'; import { MatButtonModule } from '@angular/material/button';
import { MatMenuModule } from '@angular/material/menu'; import { MatMenuModule } from '@angular/material/menu';
import { MatSelectModule } from '@angular/material/select';
import { AdminJobsComponent } from './admin-jobs.component'; import { AdminJobsComponent } from './admin-jobs.component';
@NgModule({ @NgModule({
declarations: [AdminJobsComponent], declarations: [AdminJobsComponent],
imports: [CommonModule, MatButtonModule, MatMenuModule], imports: [
CommonModule,
FormsModule,
MatButtonModule,
MatMenuModule,
MatSelectModule,
ReactiveFormsModule
],
schemas: [CUSTOM_ELEMENTS_SCHEMA] schemas: [CUSTOM_ELEMENTS_SCHEMA]
}) })
export class GfAdminJobsModule {} export class GfAdminJobsModule {}

@ -15,7 +15,6 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import { import {
differenceInSeconds, differenceInSeconds,
formatDistanceToNowStrict, formatDistanceToNowStrict,
isValid,
parseISO parseISO
} from 'date-fns'; } from 'date-fns';
import { uniq } from 'lodash'; import { uniq } from 'lodash';
@ -32,14 +31,11 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
public couponDuration: StringValue = '30 days'; public couponDuration: StringValue = '30 days';
public coupons: Coupon[]; public coupons: Coupon[];
public customCurrencies: string[]; public customCurrencies: string[];
public dataGatheringInProgress: boolean;
public dataGatheringProgress: number;
public exchangeRates: { label1: string; label2: string; value: number }[]; public exchangeRates: { label1: string; label2: string; value: number }[];
public hasPermissionForSubscription: boolean; public hasPermissionForSubscription: boolean;
public hasPermissionForSystemMessage: boolean; public hasPermissionForSystemMessage: boolean;
public hasPermissionToToggleReadOnlyMode: boolean; public hasPermissionToToggleReadOnlyMode: boolean;
public info: InfoItem; public info: InfoItem;
public lastDataGathering: string;
public transactionCount: number; public transactionCount: number;
public userCount: number; public userCount: number;
public user: User; public user: User;
@ -128,7 +124,7 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
public onDeleteCoupon(aCouponCode: string) { public onDeleteCoupon(aCouponCode: string) {
const confirmation = confirm('Do you really want to delete this coupon?'); const confirmation = confirm('Do you really want to delete this coupon?');
if (confirmation) { if (confirmation === true) {
const coupons = this.coupons.filter((coupon) => { const coupons = this.coupons.filter((coupon) => {
return coupon.code !== aCouponCode; return coupon.code !== aCouponCode;
}); });
@ -139,7 +135,7 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
public onDeleteCurrency(aCurrency: string) { public onDeleteCurrency(aCurrency: string) {
const confirmation = confirm('Do you really want to delete this currency?'); const confirmation = confirm('Do you really want to delete this currency?');
if (confirmation) { if (confirmation === true) {
const currencies = this.customCurrencies.filter((currency) => { const currencies = this.customCurrencies.filter((currency) => {
return currency !== aCurrency; return currency !== aCurrency;
}); });
@ -152,24 +148,11 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
} }
public onFlushCache() { public onFlushCache() {
this.cacheService const confirmation = confirm('Do you really want to flush the cache?');
.flush()
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
setTimeout(() => {
window.location.reload();
}, 300);
});
}
public onGatherMax() {
const confirmation = confirm(
'This action may take some time. Do you want to proceed?'
);
if (confirmation === true) { if (confirmation === true) {
this.adminService this.cacheService
.gatherMax() .flush()
.pipe(takeUntil(this.unsubscribeSubject)) .pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => { .subscribe(() => {
setTimeout(() => { setTimeout(() => {
@ -179,6 +162,28 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
} }
} }
public onGather7Days() {
this.adminService
.gather7Days()
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
setTimeout(() => {
window.location.reload();
}, 300);
});
}
public onGatherMax() {
this.adminService
.gatherMax()
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
setTimeout(() => {
window.location.reload();
}, 300);
});
}
public onGatherProfileData() { public onGatherProfileData() {
this.adminService this.adminService
.gatherProfileData() .gatherProfileData()
@ -207,39 +212,15 @@ export class AdminOverviewComponent implements OnDestroy, OnInit {
this.dataService this.dataService
.fetchAdminData() .fetchAdminData()
.pipe(takeUntil(this.unsubscribeSubject)) .pipe(takeUntil(this.unsubscribeSubject))
.subscribe( .subscribe(({ exchangeRates, settings, transactionCount, userCount }) => {
({ this.coupons = (settings[PROPERTY_COUPONS] as Coupon[]) ?? [];
dataGatheringProgress, this.customCurrencies = settings[PROPERTY_CURRENCIES] as string[];
exchangeRates, this.exchangeRates = exchangeRates;
lastDataGathering, this.transactionCount = transactionCount;
settings, this.userCount = userCount;
transactionCount,
userCount this.changeDetectorRef.markForCheck();
}) => { });
this.coupons = (settings[PROPERTY_COUPONS] as Coupon[]) ?? [];
this.customCurrencies = settings[PROPERTY_CURRENCIES] as string[];
this.dataGatheringProgress = dataGatheringProgress;
this.exchangeRates = exchangeRates;
if (isValid(parseISO(lastDataGathering?.toString()))) {
this.lastDataGathering = formatDistanceToNowStrict(
new Date(lastDataGathering),
{
addSuffix: true
}
);
} else if (lastDataGathering === 'IN_PROGRESS') {
this.dataGatheringInProgress = true;
} else {
this.lastDataGathering = 'Starting soon...';
}
this.transactionCount = transactionCount;
this.userCount = userCount;
this.changeDetectorRef.markForCheck();
}
);
} }
private generateCouponCode(aLength: number) { private generateCouponCode(aLength: number) {

@ -19,37 +19,30 @@
<div class="d-flex my-3"> <div class="d-flex my-3">
<div class="w-50" i18n>Data Gathering</div> <div class="w-50" i18n>Data Gathering</div>
<div class="w-50"> <div class="w-50">
<div> <div class="overflow-hidden">
<ng-container *ngIf="lastDataGathering"
>{{ lastDataGathering }}</ng-container
>
<ng-container *ngIf="dataGatheringInProgress" i18n
>In Progress ({{ dataGatheringProgress | percent : '1.2-2'
}})</ng-container
>
</div>
<div class="mt-2 overflow-hidden">
<div class="mb-2"> <div class="mb-2">
<button <button
color="accent" color="accent"
mat-flat-button mat-flat-button
(click)="onFlushCache()" (click)="onGather7Days()"
> >
<ion-icon <ion-icon
class="mr-1" class="mr-1"
name="close-circle-outline" name="cloud-download-outline"
></ion-icon> ></ion-icon>
<span i18n>Reset Data Gathering</span> <span i18n>Gather Recent Data</span>
</button> </button>
</div> </div>
<div class="mb-2"> <div class="mb-2">
<button <button
color="warn" color="accent"
mat-flat-button mat-flat-button
[disabled]="dataGatheringInProgress"
(click)="onGatherMax()" (click)="onGatherMax()"
> >
<ion-icon class="mr-1" name="warning-outline"></ion-icon> <ion-icon
class="mr-1"
name="cloud-download-outline"
></ion-icon>
<span i18n>Gather All Data</span> <span i18n>Gather All Data</span>
</button> </button>
</div> </div>
@ -58,7 +51,6 @@
class="mb-2 mr-2" class="mb-2 mr-2"
color="accent" color="accent"
mat-flat-button mat-flat-button
[disabled]="dataGatheringInProgress"
(click)="onGatherProfileData()" (click)="onGatherProfileData()"
> >
<ion-icon <ion-icon
@ -97,7 +89,6 @@
*ngIf="customCurrencies.includes(exchangeRate.label2)" *ngIf="customCurrencies.includes(exchangeRate.label2)"
class="mini-icon mx-1 no-min-width px-2" class="mini-icon mx-1 no-min-width px-2"
mat-button mat-button
[disabled]="dataGatheringInProgress"
(click)="onDeleteCurrency(exchangeRate.label2)" (click)="onDeleteCurrency(exchangeRate.label2)"
> >
<ion-icon name="trash-outline"></ion-icon> <ion-icon name="trash-outline"></ion-icon>
@ -109,7 +100,6 @@
<button <button
color="primary" color="primary"
mat-flat-button mat-flat-button
[disabled]="dataGatheringInProgress"
(click)="onAddCurrency()" (click)="onAddCurrency()"
> >
<ion-icon class="mr-1" name="add-outline"></ion-icon> <ion-icon class="mr-1" name="add-outline"></ion-icon>
@ -126,7 +116,6 @@
<button <button
class="mini-icon mx-1 no-min-width px-2" class="mini-icon mx-1 no-min-width px-2"
mat-button mat-button
[disabled]="dataGatheringInProgress"
(click)="onDeleteSystemMessage()" (click)="onDeleteSystemMessage()"
> >
<ion-icon name="trash-outline"></ion-icon> <ion-icon name="trash-outline"></ion-icon>
@ -197,6 +186,15 @@
</div> </div>
</div> </div>
</div> </div>
<div class="d-flex my-3">
<div class="w-50" i18n>Housekeeping</div>
<div class="w-50">
<button color="warn" mat-flat-button (click)="onFlushCache()">
<ion-icon class="mr-1" name="close-circle-outline"></ion-icon>
<span i18n>Flush Cache</span>
</button>
</div>
</div>
</mat-card-content> </mat-card-content>
</mat-card> </mat-card>
</div> </div>

@ -1,4 +1,4 @@
import { HttpClient } from '@angular/common/http'; import { HttpClient, HttpParams } from '@angular/common/http';
import { Injectable } from '@angular/core'; import { Injectable } from '@angular/core';
import { UpdateMarketDataDto } from '@ghostfolio/api/app/admin/update-market-data.dto'; import { UpdateMarketDataDto } from '@ghostfolio/api/app/admin/update-market-data.dto';
import { IDataProviderHistoricalResponse } from '@ghostfolio/api/services/interfaces/interfaces'; import { IDataProviderHistoricalResponse } from '@ghostfolio/api/services/interfaces/interfaces';
@ -9,6 +9,7 @@ import {
UniqueAsset UniqueAsset
} from '@ghostfolio/common/interfaces'; } from '@ghostfolio/common/interfaces';
import { DataSource, MarketData } from '@prisma/client'; import { DataSource, MarketData } from '@prisma/client';
import { JobStatus } from 'bull';
import { format, parseISO } from 'date-fns'; import { format, parseISO } from 'date-fns';
import { Observable, map } from 'rxjs'; import { Observable, map } from 'rxjs';
@ -18,6 +19,22 @@ import { Observable, map } from 'rxjs';
export class AdminService { export class AdminService {
public constructor(private http: HttpClient) {} public constructor(private http: HttpClient) {}
public deleteJob(aId: string) {
return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`);
}
public deleteJobs({ status }: { status: JobStatus[] }) {
let params = new HttpParams();
if (status?.length > 0) {
params = params.append('status', status.join(','));
}
return this.http.delete<void>('/api/v1/admin/queue/job', {
params
});
}
public deleteProfileData({ dataSource, symbol }: UniqueAsset) { public deleteProfileData({ dataSource, symbol }: UniqueAsset) {
return this.http.delete<void>( return this.http.delete<void>(
`/api/v1/admin/profile-data/${dataSource}/${symbol}` `/api/v1/admin/profile-data/${dataSource}/${symbol}`
@ -43,16 +60,28 @@ export class AdminService {
); );
} }
public fetchJobs() { public fetchJobs({ status }: { status?: JobStatus[] }) {
return this.http.get<AdminJobs>(`/api/v1/admin/queue/jobs`); let params = new HttpParams();
if (status?.length > 0) {
params = params.append('status', status.join(','));
}
return this.http.get<AdminJobs>('/api/v1/admin/queue/job', {
params
});
}
public gather7Days() {
return this.http.post<void>('/api/v1/admin/gather', {});
} }
public gatherMax() { public gatherMax() {
return this.http.post<void>(`/api/v1/admin/gather/max`, {}); return this.http.post<void>('/api/v1/admin/gather/max', {});
} }
public gatherProfileData() { public gatherProfileData() {
return this.http.post<void>(`/api/v1/admin/gather/profile-data`, {}); return this.http.post<void>('/api/v1/admin/gather/profile-data', {});
} }
public gatherProfileDataBySymbol({ dataSource, symbol }: UniqueAsset) { public gatherProfileDataBySymbol({ dataSource, symbol }: UniqueAsset) {

@ -1,4 +1,6 @@
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import { JobOptions, JobStatus } from 'bull';
import ms from 'ms';
import { ToggleOption } from './types'; import { ToggleOption } from './types';
@ -43,19 +45,52 @@ export const warnColorRgb = {
export const ASSET_SUB_CLASS_EMERGENCY_FUND = 'EMERGENCY_FUND'; export const ASSET_SUB_CLASS_EMERGENCY_FUND = 'EMERGENCY_FUND';
export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE';
export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER;
export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1;
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE'; export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE';
export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = {
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
removeOnComplete: {
age: ms('2 weeks') / 1000
}
};
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS =
'GATHER_HISTORICAL_MARKET_DATA';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = {
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW,
removeOnComplete: {
age: ms('2 weeks') / 1000
}
};
export const PROPERTY_BENCHMARKS = 'BENCHMARKS'; export const PROPERTY_BENCHMARKS = 'BENCHMARKS';
export const PROPERTY_COUPONS = 'COUPONS'; export const PROPERTY_COUPONS = 'COUPONS';
export const PROPERTY_CURRENCIES = 'CURRENCIES'; export const PROPERTY_CURRENCIES = 'CURRENCIES';
export const PROPERTY_IS_READ_ONLY_MODE = 'IS_READ_ONLY_MODE'; export const PROPERTY_IS_READ_ONLY_MODE = 'IS_READ_ONLY_MODE';
export const PROPERTY_LAST_DATA_GATHERING = 'LAST_DATA_GATHERING';
export const PROPERTY_LOCKED_DATA_GATHERING = 'LOCKED_DATA_GATHERING';
export const PROPERTY_SLACK_COMMUNITY_USERS = 'SLACK_COMMUNITY_USERS'; export const PROPERTY_SLACK_COMMUNITY_USERS = 'SLACK_COMMUNITY_USERS';
export const PROPERTY_STRIPE_CONFIG = 'STRIPE_CONFIG'; export const PROPERTY_STRIPE_CONFIG = 'STRIPE_CONFIG';
export const PROPERTY_SYSTEM_MESSAGE = 'SYSTEM_MESSAGE'; export const PROPERTY_SYSTEM_MESSAGE = 'SYSTEM_MESSAGE';
export const QUEUE_JOB_STATUS_LIST = <JobStatus[]>[
'active',
'completed',
'delayed',
'failed',
'paused',
'waiting'
];
export const UNKNOWN_KEY = 'UNKNOWN'; export const UNKNOWN_KEY = 'UNKNOWN';

@ -1,7 +1,5 @@
export interface AdminData { export interface AdminData {
dataGatheringProgress?: number;
exchangeRates: { label1: string; label2: string; value: number }[]; exchangeRates: { label1: string; label2: string; value: number }[];
lastDataGathering?: Date | 'IN_PROGRESS';
settings: { [key: string]: boolean | object | string | string[] }; settings: { [key: string]: boolean | object | string | string[] };
transactionCount: number; transactionCount: number;
userCount: number; userCount: number;

@ -1,5 +1,16 @@
import { Job } from 'bull'; import { Job, JobStatus } from 'bull';
export interface AdminJobs { export interface AdminJobs {
jobs: Job<any>[]; jobs: (Pick<
Job<any>,
| 'attemptsMade'
| 'data'
| 'finishedOn'
| 'id'
| 'name'
| 'stacktrace'
| 'timestamp'
> & {
state: JobStatus | 'stuck';
})[];
} }

Loading…
Cancel
Save