Feature/improve queue jobs implementation (#1855)

* Improve queue jobs implementation

* Update changelog
pull/1856/head
Thomas Kaul 2 years ago committed by GitHub
parent 4451514ec5
commit 1ed5690b33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Changed
- Improved the queue jobs implementation by adding in bulk
- Improved the queue jobs implementation by introducing unique job ids
## 1.253.0 - 2023-04-14 ## 1.253.0 - 2023-04-14
### Changed ### Changed

@ -107,7 +107,10 @@ export class AdminController {
dataSource, dataSource,
symbol symbol
}, },
GATHER_ASSET_PROFILE_PROCESS_OPTIONS {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
); );
} }
@ -138,7 +141,10 @@ export class AdminController {
dataSource, dataSource,
symbol symbol
}, },
GATHER_ASSET_PROFILE_PROCESS_OPTIONS {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
); );
} }
} }
@ -167,7 +173,10 @@ export class AdminController {
dataSource, dataSource,
symbol symbol
}, },
GATHER_ASSET_PROFILE_PROCESS_OPTIONS {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
); );
} }

@ -118,7 +118,10 @@ export class OrderService {
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol symbol: data.SymbolProfile.connectOrCreate.create.symbol
}, },
GATHER_ASSET_PROFILE_PROCESS_OPTIONS {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${data.SymbolProfile.connectOrCreate.create.dataSource}-${data.SymbolProfile.connectOrCreate.create.symbol}}`
}
); );
const isDraft = isAfter(data.date as Date, endOfToday()); const isDraft = isAfter(data.date as Date, endOfToday());

@ -21,13 +21,12 @@ export class CronService {
@Cron(CronExpression.EVERY_4_HOURS) @Cron(CronExpression.EVERY_4_HOURS)
public async runEveryFourHours() { public async runEveryFourHours() {
// await this.dataGatheringService.gather7Days(); await this.dataGatheringService.gather7Days();
} }
@Cron(CronExpression.EVERY_12_HOURS) @Cron(CronExpression.EVERY_12_HOURS)
public async runEveryTwelveHours() { public async runEveryTwelveHours() {
await this.exchangeRateDataService.loadCurrencies(); await this.exchangeRateDataService.loadCurrencies();
await this.dataGatheringService.gather7Days();
} }
@Cron(CronExpression.EVERY_DAY_AT_5PM) @Cron(CronExpression.EVERY_DAY_AT_5PM)
@ -46,7 +45,10 @@ export class CronService {
dataSource, dataSource,
symbol symbol
}, },
GATHER_ASSET_PROFILE_PROCESS_OPTIONS {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
); );
} }
} }

@ -2,8 +2,7 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.se
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
GATHER_HISTORICAL_MARKET_DATA_PROCESS, GATHER_HISTORICAL_MARKET_DATA_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS
QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { UniqueAsset } from '@ghostfolio/common/interfaces';
@ -34,17 +33,14 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
public async addJobToQueue(name: string, data: any, options?: JobOptions) { public async addJobToQueue(name: string, data: any, opts?: JobOptions) {
const hasJob = await this.hasJob(name, data); return this.dataGatheringQueue.add(name, data, opts);
}
if (hasJob) { public async addJobsToQueue(
Logger.log( jobs: { data: any; name: string; opts?: JobOptions }[]
`Job ${name} with data ${JSON.stringify(data)} already exists.`, ) {
'DataGatheringService' return this.dataGatheringQueue.addBulk(jobs);
);
} else {
return this.dataGatheringQueue.add(name, data, options);
}
} }
public async gather7Days() { public async gather7Days() {
@ -209,17 +205,22 @@ export class DataGatheringService {
} }
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) {
for (const { dataSource, date, symbol } of aSymbolsWithStartDate) { await this.addJobsToQueue(
await this.addJobToQueue( aSymbolsWithStartDate.map(({ dataSource, date, symbol }) => {
GATHER_HISTORICAL_MARKET_DATA_PROCESS, return {
{ data: {
dataSource, dataSource,
date, date,
symbol symbol
}, },
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS name: GATHER_HISTORICAL_MARKET_DATA_PROCESS,
); opts: {
} ...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}-${format(date, DATE_FORMAT)}`
}
};
})
);
} }
public async getSymbolsMax(): Promise<IDataGatheringItem[]> { public async getSymbolsMax(): Promise<IDataGatheringItem[]> {
@ -341,18 +342,4 @@ export class DataGatheringService {
return [...currencyPairsToGather, ...symbolProfilesToGather]; return [...currencyPairsToGather, ...symbolProfilesToGather];
} }
private async hasJob(name: string, data: any) {
const jobs = await this.dataGatheringQueue.getJobs(
QUEUE_JOB_STATUS_LIST.filter((status) => {
return status !== 'completed';
})
);
return jobs.some((job) => {
return (
job.name === name && JSON.stringify(job.data) === JSON.stringify(data)
);
});
}
} }

Loading…
Cancel
Save