Files
Automata/packages/cli/src/services/pruning.service.ts
Iván Ovejero 193fe5ac1f refactor(core): Log any hard-deletion errors during pruning (no-changelog) (#7965)
## Summary
Pruning is a minor background task so hard-deletion errors during
pruning should be simply logged for later investigation rather than
rethrown.

...

#### How to test the change:
1. ...


## Issues fixed
Include links to Github issue or Community forum post or **Linear
ticket**:
> Important in order to close automatically and provide context to
reviewers

...


## Review / Merge checklist
- [ ] PR title and summary are descriptive. **Remember, the title
automatically goes into the changelog. Use `(no-changelog)` otherwise.**
([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md))
- [ ] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up
ticket created.
- [ ] Tests included.
> A bug is not considered fixed, unless a test is added to prevent it
from happening again. A feature is not complete without tests.
  >
> *(internal)* You can use Slack commands to trigger [e2e
tests](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#a39f9e5ba64a48b58a71d81c837e8227)
or [deploy test
instance](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#f6a177d32bde4b57ae2da0b8e454bfce)
or [deploy early access version on
Cloud](https://www.notion.so/n8n/Cloudbot-3dbe779836004972b7057bc989526998?pvs=4#fef2d36ab02247e1a0f65a74f6fb534e).
2023-12-11 11:08:14 +01:00

218 lines
6.4 KiB
TypeScript

import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import type { FindOptionsWhere } from 'typeorm';
import { Brackets, In, IsNull, LessThanOrEqual, Not } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import { inTest, TIME } from '@/constants';
import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { jsonStringify } from 'n8n-workflow';
@Service()
export class PruningService {
private hardDeletionBatchSize = 100;
private rates: Record<string, number> = {
softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE,
hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE,
};
public softDeletionInterval: NodeJS.Timer | undefined;
public hardDeletionTimeout: NodeJS.Timeout | undefined;
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
) {}
isPruningEnabled() {
if (
!config.getEnv('executions.pruneData') ||
inTest ||
config.get('generic.instanceType') !== 'main'
) {
return false;
}
if (
config.getEnv('multiMainSetup.enabled') &&
config.getEnv('generic.instanceType') === 'main' &&
config.getEnv('multiMainSetup.instanceType') === 'follower'
) {
return false;
}
return true;
}
/**
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
this.scheduleHardDeletion();
}
stopPruning() {
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout);
}
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.softDeletionInterval = setInterval(
async () => this.softDeleteOnPruningCycle(),
this.rates.softDeletion,
);
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
}
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.hardDeletionTimeout = setTimeout(() => {
this.hardDeleteOnPruningCycle()
.then((rate) => this.scheduleHardDeletion(rate))
.catch((error) => {
this.scheduleHardDeletion(1 * TIME.SECOND);
const errorMessage =
error instanceof Error
? error.message
: jsonStringify(error, { replaceCircularRefs: true });
this.logger.error('[Pruning] Failed to hard-delete executions', { errorMessage });
});
}, rateMs);
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
}
/**
* Mark executions as deleted based on age and count, in a pruning cycle.
*/
async softDeleteOnPruningCycle() {
this.logger.debug('[Pruning] Starting soft-deletion of executions');
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
];
if (maxCount > 0) {
const executions = await this.executionRepository.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});
if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}
const [timeBasedWhere, countBasedWhere] = toPrune;
const result = await this.executionRepository
.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
: qb.where(timeBasedWhere),
),
)
.execute();
if (result.affected === 0) {
this.logger.debug('[Pruning] Found no executions to soft-delete');
return;
}
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
}
/**
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
* @return Delay in ms after which the next cycle should be started
*/
private async hardDeleteOnPruningCycle() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));
const workflowIdsAndExecutionIds = (
await this.executionRepository.find({
select: ['workflowId', 'id'],
where: {
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
},
take: this.hardDeletionBatchSize,
/**
* @important This ensures soft-deleted executions are included,
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
*/
withDeleted: true,
})
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
if (executionIds.length === 0) {
this.logger.debug('[Pruning] Found no executions to hard-delete');
return this.rates.hardDeletion;
}
try {
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds,
});
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.executionRepository.delete({ id: In(executionIds) });
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) {
this.logger.error('[Pruning] Failed to hard-delete executions', {
executionIds,
error: error instanceof Error ? error.message : `${error}`,
});
}
/**
* For next batch, speed up hard-deletion cycle in high-volume case
* to prevent high concurrency from causing duplicate deletions.
*/
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
return rate;
}
}