diff --git a/CHANGELOG.md b/CHANGELOG.md index 90b46c51f..a0d860290 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Changed + +- Skipped creating queue jobs for asset profiles with `MANUAL` data source not having a scraper configuration +- Reduced the execution interval of the data gathering to every hour + ## 1.254.0 - 2023-04-14 ### Changed - Improved the queue jobs implementation by adding in bulk - Improved the queue jobs implementation by introducing unique job ids +- Reverted the execution interval of the data gathering from every 12 hours to every 4 hours ## 1.253.0 - 2023-04-14 diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts index 6d34f8cdb..c32ecf03b 100644 --- a/apps/api/src/app/admin/admin.controller.ts +++ b/apps/api/src/app/admin/admin.controller.ts @@ -100,19 +100,21 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); - for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringService.addJobToQueue( - GATHER_ASSET_PROFILE_PROCESS, - { - dataSource, - symbol - }, - { - ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, - jobId: `${dataSource}-${symbol}}` - } - ); - } + await this.dataGatheringService.addJobsToQueue( + uniqueAssets.map(({ dataSource, symbol }) => { + return { + data: { + dataSource, + symbol + }, + name: GATHER_ASSET_PROFILE_PROCESS, + opts: { + ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, + jobId: `${dataSource}-${symbol}}` + } + }; + }) + ); this.dataGatheringService.gatherMax(); } @@ -134,19 +136,21 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); - for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringService.addJobToQueue( - GATHER_ASSET_PROFILE_PROCESS, - { - dataSource, - symbol - }, - { - ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, - jobId: `${dataSource}-${symbol}}` - } - ); - } + await this.dataGatheringService.addJobsToQueue( + uniqueAssets.map(({ dataSource, symbol }) => { + return { + data: { + dataSource, + symbol + }, + name: GATHER_ASSET_PROFILE_PROCESS, + opts: { + ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, + jobId: `${dataSource}-${symbol}}` + } + }; + }) + ); } @Post('gather/profile-data/:dataSource/:symbol') @@ -167,17 +171,17 @@ export class AdminController { ); } - await this.dataGatheringService.addJobToQueue( - GATHER_ASSET_PROFILE_PROCESS, - { + await this.dataGatheringService.addJobToQueue({ + data: { dataSource, symbol }, - { + name: GATHER_ASSET_PROFILE_PROCESS, + opts: { ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, jobId: `${dataSource}-${symbol}}` } - ); + }); } @Post('gather/:dataSource/:symbol') diff --git a/apps/api/src/app/order/order.service.ts b/apps/api/src/app/order/order.service.ts index 50cc3bf71..4080c5c34 100644 --- a/apps/api/src/app/order/order.service.ts +++ b/apps/api/src/app/order/order.service.ts @@ -112,17 +112,17 @@ export class OrderService { }; } - await this.dataGatheringService.addJobToQueue( - GATHER_ASSET_PROFILE_PROCESS, - { + await this.dataGatheringService.addJobToQueue({ + data: { dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, symbol: data.SymbolProfile.connectOrCreate.create.symbol }, - { + name: GATHER_ASSET_PROFILE_PROCESS, + opts: { ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, jobId: `${data.SymbolProfile.connectOrCreate.create.dataSource}-${data.SymbolProfile.connectOrCreate.create.symbol}}` } - ); + }); const isDraft = isAfter(data.date as Date, endOfToday()); diff --git a/apps/api/src/services/cron.service.ts b/apps/api/src/services/cron.service.ts index 69906dfa9..bf186b32f 100644 --- a/apps/api/src/services/cron.service.ts +++ b/apps/api/src/services/cron.service.ts @@ -19,8 +19,8 @@ export class CronService { private readonly twitterBotService: TwitterBotService ) {} - @Cron(CronExpression.EVERY_4_HOURS) - public async runEveryFourHours() { + @Cron(CronExpression.EVERY_HOUR) + public async runEveryHour() { await this.dataGatheringService.gather7Days(); } @@ -38,18 +38,20 @@ export class CronService { public async runEverySundayAtTwelvePm() { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); - for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringService.addJobToQueue( - GATHER_ASSET_PROFILE_PROCESS, - { - dataSource, - symbol - }, - { - ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, - jobId: `${dataSource}-${symbol}}` - } - ); - } + await this.dataGatheringService.addJobsToQueue( + uniqueAssets.map(({ dataSource, symbol }) => { + return { + data: { + dataSource, + symbol + }, + name: GATHER_ASSET_PROFILE_PROCESS, + opts: { + ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS, + jobId: `${dataSource}-${symbol}}` + } + }; + }) + ); } } diff --git a/apps/api/src/services/data-gathering.service.ts b/apps/api/src/services/data-gathering.service.ts index 4015cf114..5de70c925 100644 --- a/apps/api/src/services/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering.service.ts @@ -11,6 +11,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; +import { isEmpty } from 'lodash'; import { DataProviderService } from './data-provider/data-provider.service'; import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; @@ -33,7 +34,15 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} - public async addJobToQueue(name: string, data: any, opts?: JobOptions) { + public async addJobToQueue({ + data, + name, + opts + }: { + data: any; + name: string; + opts?: JobOptions; + }) { return this.dataGatheringQueue.add(name, data, opts); } @@ -223,48 +232,6 @@ export class DataGatheringService { ); } - public async getSymbolsMax(): Promise { - const startDate = - ( - await this.prismaService.order.findFirst({ - orderBy: [{ date: 'asc' }] - }) - )?.date ?? new Date(); - - const currencyPairsToGather = this.exchangeRateDataService - .getCurrencyPairs() - .map(({ dataSource, symbol }) => { - return { - dataSource, - symbol, - date: min([startDate, subYears(new Date(), 10)]) - }; - }); - - const symbolProfilesToGather = ( - await this.prismaService.symbolProfile.findMany({ - orderBy: [{ symbol: 'asc' }], - select: { - dataSource: true, - Order: { - orderBy: [{ date: 'asc' }], - select: { date: true }, - take: 1 - }, - scraperConfiguration: true, - symbol: true - } - }) - ).map((symbolProfile) => { - return { - ...symbolProfile, - date: symbolProfile.Order?.[0]?.date ?? startDate - }; - }); - - return [...currencyPairsToGather, ...symbolProfilesToGather]; - } - public async getUniqueAssets(): Promise { const symbolProfiles = await this.prismaService.symbolProfile.findMany({ orderBy: [{ symbol: 'asc' }] @@ -299,7 +266,7 @@ export class DataGatheringService { // Only consider symbols with incomplete market data for the last // 7 days - const symbolsNotToGather = ( + const symbolsWithCompleteMarketData = ( await this.prismaService.marketData.groupBy({ _count: true, by: ['symbol'], @@ -317,8 +284,14 @@ export class DataGatheringService { }); const symbolProfilesToGather = symbolProfiles - .filter(({ symbol }) => { - return !symbolsNotToGather.includes(symbol); + .filter(({ dataSource, scraperConfiguration, symbol }) => { + const manualDataSourceWithScraperConfiguration = + dataSource === 'MANUAL' && !isEmpty(scraperConfiguration); + + return ( + !symbolsWithCompleteMarketData.includes(symbol) && + (dataSource !== 'MANUAL' || manualDataSourceWithScraperConfiguration) + ); }) .map((symbolProfile) => { return { @@ -330,7 +303,7 @@ export class DataGatheringService { const currencyPairsToGather = this.exchangeRateDataService .getCurrencyPairs() .filter(({ symbol }) => { - return !symbolsNotToGather.includes(symbol); + return !symbolsWithCompleteMarketData.includes(symbol); }) .map(({ dataSource, symbol }) => { return { @@ -342,4 +315,57 @@ export class DataGatheringService { return [...currencyPairsToGather, ...symbolProfilesToGather]; } + + private async getSymbolsMax(): Promise { + const startDate = + ( + await this.prismaService.order.findFirst({ + orderBy: [{ date: 'asc' }] + }) + )?.date ?? new Date(); + + const currencyPairsToGather = this.exchangeRateDataService + .getCurrencyPairs() + .map(({ dataSource, symbol }) => { + return { + dataSource, + symbol, + date: min([startDate, subYears(new Date(), 10)]) + }; + }); + + const symbolProfilesToGather = ( + await this.prismaService.symbolProfile.findMany({ + orderBy: [{ symbol: 'asc' }], + select: { + dataSource: true, + Order: { + orderBy: [{ date: 'asc' }], + select: { date: true }, + take: 1 + }, + scraperConfiguration: true, + symbol: true + } + }) + ) + .filter((symbolProfile) => { + const manualDataSourceWithScraperConfiguration = + symbolProfile.dataSource === 'MANUAL' && + !isEmpty(symbolProfile.scraperConfiguration); + + return ( + symbolProfile.dataSource !== 'MANUAL' || + manualDataSourceWithScraperConfiguration + ); + }) + .map((symbolProfile) => { + return { + ...symbolProfile, + date: symbolProfile.Order?.[0]?.date ?? startDate + }; + }); + + return [...currencyPairsToGather, ...symbolProfilesToGather]; + } }