Feature/set up portfolio snapshot queue (#3725)
* Set up portfolio snapshot queue * Update changelogpull/3748/head
parent
383a02519a
commit
403ee2741d
@ -1,13 +1,28 @@
|
||||
import { Filter } from '@ghostfolio/common/interfaces';
|
||||
|
||||
import { Milliseconds } from 'cache-manager';
|
||||
|
||||
export const RedisCacheServiceMock = {
|
||||
cache: new Map<string, string>(),
|
||||
get: (key: string): Promise<string> => {
|
||||
return Promise.resolve(null);
|
||||
const value = RedisCacheServiceMock.cache.get(key) || null;
|
||||
|
||||
return Promise.resolve(value);
|
||||
},
|
||||
getPortfolioSnapshotKey: (userId: string): string => {
|
||||
return `portfolio-snapshot-${userId}`;
|
||||
getPortfolioSnapshotKey: ({
|
||||
filters,
|
||||
userId
|
||||
}: {
|
||||
filters?: Filter[];
|
||||
userId: string;
|
||||
}): string => {
|
||||
const filtersHash = filters?.length;
|
||||
|
||||
return `portfolio-snapshot-${userId}${filtersHash > 0 ? `-${filtersHash}` : ''}`;
|
||||
},
|
||||
set: (key: string, value: string, ttl?: Milliseconds): Promise<string> => {
|
||||
RedisCacheServiceMock.cache.set(key, value);
|
||||
|
||||
return Promise.resolve(value);
|
||||
}
|
||||
};
|
||||
|
@ -1,11 +1,11 @@
|
||||
import { ConfigurationModule } from '@ghostfolio/api/services/configuration/configuration.module';
|
||||
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering/data-gathering.service';
|
||||
import { DataEnhancerModule } from '@ghostfolio/api/services/data-provider/data-enhancer/data-enhancer.module';
|
||||
import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module';
|
||||
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
|
||||
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
|
||||
import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module';
|
||||
import { PropertyModule } from '@ghostfolio/api/services/property/property.module';
|
||||
import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathering/data-gathering.service';
|
||||
import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module';
|
||||
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config';
|
||||
|
@ -0,0 +1,7 @@
|
||||
import { Filter } from '@ghostfolio/common/interfaces';
|
||||
|
||||
export interface IPortfolioSnapshotQueueJob {
|
||||
filters: Filter[];
|
||||
userCurrency: string;
|
||||
userId: string;
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
import { OrderModule } from '@ghostfolio/api/app/order/order.module';
|
||||
import { PortfolioCalculatorFactory } from '@ghostfolio/api/app/portfolio/calculator/portfolio-calculator.factory';
|
||||
import { CurrentRateService } from '@ghostfolio/api/app/portfolio/current-rate.service';
|
||||
import { RedisCacheModule } from '@ghostfolio/api/app/redis-cache/redis-cache.module';
|
||||
import { ConfigurationModule } from '@ghostfolio/api/services/configuration/configuration.module';
|
||||
import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module';
|
||||
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
|
||||
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
|
||||
import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service';
|
||||
import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config';
|
||||
|
||||
import { BullModule } from '@nestjs/bull';
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
|
||||
|
||||
@Module({
|
||||
exports: [BullModule, PortfolioSnapshotService],
|
||||
imports: [
|
||||
BullModule.registerQueue({
|
||||
name: PORTFOLIO_SNAPSHOT_QUEUE
|
||||
}),
|
||||
ConfigurationModule,
|
||||
DataProviderModule,
|
||||
ExchangeRateDataModule,
|
||||
MarketDataModule,
|
||||
OrderModule,
|
||||
RedisCacheModule
|
||||
],
|
||||
providers: [
|
||||
CurrentRateService,
|
||||
PortfolioCalculatorFactory,
|
||||
PortfolioSnapshotProcessor,
|
||||
PortfolioSnapshotService
|
||||
]
|
||||
})
|
||||
export class PortfolioSnapshotQueueModule {}
|
@ -0,0 +1,96 @@
|
||||
import { OrderService } from '@ghostfolio/api/app/order/order.service';
|
||||
import {
|
||||
PerformanceCalculationType,
|
||||
PortfolioCalculatorFactory
|
||||
} from '@ghostfolio/api/app/portfolio/calculator/portfolio-calculator.factory';
|
||||
import { PortfolioSnapshotValue } from '@ghostfolio/api/app/portfolio/interfaces/snapshot-value.interface';
|
||||
import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service';
|
||||
import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service';
|
||||
import {
|
||||
CACHE_TTL_INFINITE,
|
||||
PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME,
|
||||
PORTFOLIO_SNAPSHOT_QUEUE
|
||||
} from '@ghostfolio/common/config';
|
||||
|
||||
import { Process, Processor } from '@nestjs/bull';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Job } from 'bull';
|
||||
import { addMilliseconds } from 'date-fns';
|
||||
|
||||
import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
|
||||
|
||||
@Injectable()
|
||||
@Processor(PORTFOLIO_SNAPSHOT_QUEUE)
|
||||
export class PortfolioSnapshotProcessor {
|
||||
public constructor(
|
||||
private readonly calculatorFactory: PortfolioCalculatorFactory,
|
||||
private readonly configurationService: ConfigurationService,
|
||||
private readonly orderService: OrderService,
|
||||
private readonly redisCacheService: RedisCacheService
|
||||
) {}
|
||||
|
||||
@Process({ concurrency: 1, name: PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME })
|
||||
public async calculatePortfolioSnapshot(
|
||||
job: Job<IPortfolioSnapshotQueueJob>
|
||||
) {
|
||||
try {
|
||||
const startTime = performance.now();
|
||||
|
||||
Logger.log(
|
||||
`Portfolio snapshot calculation of user '${job.data.userId}' has been started`,
|
||||
`PortfolioSnapshotProcessor (${PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME})`
|
||||
);
|
||||
|
||||
const { activities } =
|
||||
await this.orderService.getOrdersForPortfolioCalculator({
|
||||
filters: job.data.filters,
|
||||
userCurrency: job.data.userCurrency,
|
||||
userId: job.data.userId
|
||||
});
|
||||
|
||||
const portfolioCalculator = this.calculatorFactory.createCalculator({
|
||||
activities,
|
||||
calculationType: PerformanceCalculationType.TWR,
|
||||
currency: job.data.userCurrency,
|
||||
filters: job.data.filters,
|
||||
userId: job.data.userId
|
||||
});
|
||||
|
||||
const snapshot = await portfolioCalculator.computeSnapshot();
|
||||
|
||||
Logger.log(
|
||||
`Portfolio snapshot calculation of user '${job.data.userId}' has been completed in ${(
|
||||
(performance.now() - startTime) /
|
||||
1000
|
||||
).toFixed(3)} seconds`,
|
||||
`PortfolioSnapshotProcessor (${PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME})`
|
||||
);
|
||||
|
||||
const expiration = addMilliseconds(
|
||||
new Date(),
|
||||
this.configurationService.get('CACHE_QUOTES_TTL')
|
||||
);
|
||||
|
||||
this.redisCacheService.set(
|
||||
this.redisCacheService.getPortfolioSnapshotKey({
|
||||
filters: job.data.filters,
|
||||
userId: job.data.userId
|
||||
}),
|
||||
JSON.stringify(<PortfolioSnapshotValue>(<unknown>{
|
||||
expiration: expiration.getTime(),
|
||||
portfolioSnapshot: snapshot
|
||||
})),
|
||||
CACHE_TTL_INFINITE
|
||||
);
|
||||
|
||||
return snapshot;
|
||||
} catch (error) {
|
||||
Logger.error(
|
||||
error,
|
||||
`PortfolioSnapshotProcessor (${PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME})`
|
||||
);
|
||||
|
||||
throw new Error(error);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
import { Job, JobOptions } from 'bull';
|
||||
import { setTimeout } from 'timers/promises';
|
||||
|
||||
import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
|
||||
|
||||
export const PortfolioSnapshotServiceMock = {
|
||||
addJobToQueue({
|
||||
data,
|
||||
name,
|
||||
opts
|
||||
}: {
|
||||
data: IPortfolioSnapshotQueueJob;
|
||||
name: string;
|
||||
opts?: JobOptions;
|
||||
}): Promise<Job<any>> {
|
||||
const mockJob: Partial<Job<any>> = {
|
||||
finished: async () => {
|
||||
await setTimeout(100);
|
||||
|
||||
return Promise.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
this.jobsStore.set(opts?.jobId, mockJob);
|
||||
|
||||
return Promise.resolve(mockJob as Job<any>);
|
||||
},
|
||||
getJob(jobId: string): Promise<Job<any>> {
|
||||
const job = this.jobsStore.get(jobId);
|
||||
|
||||
return Promise.resolve(job as Job<any>);
|
||||
},
|
||||
jobsStore: new Map<string, Partial<Job<any>>>()
|
||||
};
|
@ -0,0 +1,31 @@
|
||||
import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config';
|
||||
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { JobOptions, Queue } from 'bull';
|
||||
|
||||
import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
|
||||
|
||||
@Injectable()
|
||||
export class PortfolioSnapshotService {
|
||||
public constructor(
|
||||
@InjectQueue(PORTFOLIO_SNAPSHOT_QUEUE)
|
||||
private readonly portfolioSnapshotQueue: Queue
|
||||
) {}
|
||||
|
||||
public async addJobToQueue({
|
||||
data,
|
||||
name,
|
||||
opts
|
||||
}: {
|
||||
data: IPortfolioSnapshotQueueJob;
|
||||
name: string;
|
||||
opts?: JobOptions;
|
||||
}) {
|
||||
return this.portfolioSnapshotQueue.add(name, data, opts);
|
||||
}
|
||||
|
||||
public async getJob(jobId: string) {
|
||||
return this.portfolioSnapshotQueue.getJob(jobId);
|
||||
}
|
||||
}
|
Loading…
Reference in new issue