fix(core): Reduce memory consumption on BinaryDataManager.init (#6633)
fix(core): Reduce memory consumption on BinaryDataManager.init When there are a few thousand binary data file to delete, the `deleteMarkedFiles` and `deleteMarkedPersistedFiles` methods need a lot of memory to process these files, irrespective of if these files have any data or not.
This commit is contained in:
committed by
GitHub
parent
180ab8d7c2
commit
329d22f5d1
@@ -37,7 +37,7 @@ export = {
|
||||
return res.status(404).json({ message: 'Not Found' });
|
||||
}
|
||||
|
||||
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(execution.id!);
|
||||
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]);
|
||||
|
||||
await deleteExecution(execution);
|
||||
|
||||
|
||||
@@ -240,7 +240,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||
|
||||
async deleteExecution(executionId: string) {
|
||||
// TODO: Should this be awaited? Should we add a catch in case it fails?
|
||||
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(executionId);
|
||||
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([executionId]);
|
||||
return this.delete({ id: executionId });
|
||||
}
|
||||
|
||||
@@ -392,17 +392,14 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||
return;
|
||||
}
|
||||
|
||||
const idsToDelete = executions.map(({ id }) => id);
|
||||
|
||||
const executionIds = executions.map(({ id }) => id);
|
||||
const binaryDataManager = BinaryDataManager.getInstance();
|
||||
await Promise.all(
|
||||
idsToDelete.map(async (id) => binaryDataManager.deleteBinaryDataByExecutionId(id)),
|
||||
);
|
||||
await binaryDataManager.deleteBinaryDataByExecutionIds(executionIds);
|
||||
|
||||
do {
|
||||
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
|
||||
const batch = idsToDelete.splice(0, 500);
|
||||
const batch = executionIds.splice(0, 500);
|
||||
await this.delete(batch);
|
||||
} while (idsToDelete.length > 0);
|
||||
} while (executionIds.length > 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import glob from 'fast-glob';
|
||||
import { createReadStream } from 'fs';
|
||||
import fs from 'fs/promises';
|
||||
import path from 'path';
|
||||
@@ -12,6 +13,9 @@ import { FileNotFoundError } from '../errors';
|
||||
const PREFIX_METAFILE = 'binarymeta';
|
||||
const PREFIX_PERSISTED_METAFILE = 'persistedmeta';
|
||||
|
||||
const executionExtractionRegexp =
|
||||
/^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/;
|
||||
|
||||
export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
private storagePath: string;
|
||||
|
||||
@@ -36,16 +40,12 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
}, this.persistedBinaryDataTTL * 30000);
|
||||
}
|
||||
|
||||
return fs
|
||||
.readdir(this.storagePath)
|
||||
.catch(async () => fs.mkdir(this.storagePath, { recursive: true }))
|
||||
.then(async () => fs.readdir(this.getBinaryDataMetaPath()))
|
||||
.catch(async () => fs.mkdir(this.getBinaryDataMetaPath(), { recursive: true }))
|
||||
.then(async () => fs.readdir(this.getBinaryDataPersistMetaPath()))
|
||||
.catch(async () => fs.mkdir(this.getBinaryDataPersistMetaPath(), { recursive: true }))
|
||||
.then(async () => this.deleteMarkedFiles())
|
||||
.then(async () => this.deleteMarkedPersistedFiles())
|
||||
.then(() => {});
|
||||
await this.assertFolder(this.storagePath);
|
||||
await this.assertFolder(this.getBinaryDataMetaPath());
|
||||
await this.assertFolder(this.getBinaryDataPersistMetaPath());
|
||||
|
||||
await this.deleteMarkedFiles();
|
||||
await this.deleteMarkedPersistedFiles();
|
||||
}
|
||||
|
||||
async getFileSize(identifier: string): Promise<number> {
|
||||
@@ -122,46 +122,37 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
`${PREFIX_PERSISTED_METAFILE}_${executionId}_${timeoutTime}`,
|
||||
);
|
||||
|
||||
return fs
|
||||
.readFile(filePath)
|
||||
.catch(async () => fs.writeFile(filePath, identifier))
|
||||
.then(() => {});
|
||||
try {
|
||||
await fs.access(filePath);
|
||||
} catch {
|
||||
await fs.writeFile(filePath, identifier);
|
||||
}
|
||||
}
|
||||
|
||||
private async deleteMarkedFilesByMeta(metaPath: string, filePrefix: string): Promise<void> {
|
||||
const currentTimeValue = new Date().valueOf();
|
||||
const metaFileNames = await fs.readdir(metaPath);
|
||||
const metaFileNames = await glob(`${filePrefix}_*`, { cwd: metaPath });
|
||||
|
||||
const execsAdded: { [key: string]: number } = {};
|
||||
const executionIds = metaFileNames
|
||||
.map((f) => f.split('_') as [string, string, string])
|
||||
.filter(([prefix, , ts]) => {
|
||||
if (prefix !== filePrefix) return false;
|
||||
const execTimestamp = parseInt(ts, 10);
|
||||
return execTimestamp < currentTimeValue;
|
||||
})
|
||||
.map((e) => e[1]);
|
||||
|
||||
const promises = metaFileNames.reduce<Array<Promise<void>>>((prev, curr) => {
|
||||
const [prefix, executionId, ts] = curr.split('_');
|
||||
|
||||
if (prefix !== filePrefix) {
|
||||
return prev;
|
||||
}
|
||||
|
||||
const execTimestamp = parseInt(ts, 10);
|
||||
|
||||
if (execTimestamp < currentTimeValue) {
|
||||
if (execsAdded[executionId]) {
|
||||
// do not delete data, only meta file
|
||||
prev.push(this.deleteMetaFileByPath(path.join(metaPath, curr)));
|
||||
return prev;
|
||||
}
|
||||
|
||||
execsAdded[executionId] = 1;
|
||||
prev.push(
|
||||
this.deleteBinaryDataByExecutionId(executionId).then(async () =>
|
||||
this.deleteMetaFileByPath(path.join(metaPath, curr)),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return prev;
|
||||
}, []);
|
||||
|
||||
await Promise.all(promises);
|
||||
const filesToDelete = [];
|
||||
const deletedIds = await this.deleteBinaryDataByExecutionIds(executionIds);
|
||||
for (const executionId of deletedIds) {
|
||||
filesToDelete.push(
|
||||
...(await glob(`${filePrefix}_${executionId}_`, {
|
||||
absolute: true,
|
||||
cwd: metaPath,
|
||||
})),
|
||||
);
|
||||
}
|
||||
await Promise.all(filesToDelete.map(async (file) => fs.rm(file)));
|
||||
}
|
||||
|
||||
async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise<string> {
|
||||
@@ -174,18 +165,19 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
return newBinaryDataId;
|
||||
}
|
||||
|
||||
async deleteBinaryDataByExecutionId(executionId: string): Promise<void> {
|
||||
const regex = new RegExp(`${executionId}_*`);
|
||||
const filenames = await fs.readdir(this.storagePath);
|
||||
|
||||
const promises = filenames.reduce<Array<Promise<void>>>((allProms, filename) => {
|
||||
if (regex.test(filename)) {
|
||||
allProms.push(fs.rm(this.resolveStoragePath(filename)));
|
||||
async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<string[]> {
|
||||
const set = new Set(executionIds);
|
||||
const fileNames = await fs.readdir(this.storagePath);
|
||||
const deletedIds = [];
|
||||
for (const fileName of fileNames) {
|
||||
const executionId = fileName.match(executionExtractionRegexp)?.[1];
|
||||
if (executionId && set.has(executionId)) {
|
||||
const filePath = this.resolveStoragePath(fileName);
|
||||
await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]);
|
||||
deletedIds.push(executionId);
|
||||
}
|
||||
return allProms;
|
||||
}, []);
|
||||
|
||||
await Promise.all(promises);
|
||||
}
|
||||
return deletedIds;
|
||||
}
|
||||
|
||||
async deleteBinaryDataByIdentifier(identifier: string): Promise<void> {
|
||||
@@ -193,18 +185,24 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
}
|
||||
|
||||
async persistBinaryDataForExecutionId(executionId: string): Promise<void> {
|
||||
return fs.readdir(this.getBinaryDataPersistMetaPath()).then(async (metaFiles) => {
|
||||
const promises = metaFiles.reduce<Array<Promise<void>>>((prev, curr) => {
|
||||
if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) {
|
||||
prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr)));
|
||||
return prev;
|
||||
}
|
||||
|
||||
const metaFiles = await fs.readdir(this.getBinaryDataPersistMetaPath());
|
||||
const promises = metaFiles.reduce<Array<Promise<void>>>((prev, curr) => {
|
||||
if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) {
|
||||
prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr)));
|
||||
return prev;
|
||||
}, []);
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
});
|
||||
return prev;
|
||||
}, []);
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
private async assertFolder(folder: string): Promise<void> {
|
||||
try {
|
||||
await fs.access(folder);
|
||||
} catch {
|
||||
await fs.mkdir(folder, { recursive: true });
|
||||
}
|
||||
}
|
||||
|
||||
private generateFileName(prefix: string): string {
|
||||
@@ -219,10 +217,6 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
|
||||
return path.join(this.storagePath, 'persistMeta');
|
||||
}
|
||||
|
||||
private async deleteMetaFileByPath(metaFilePath: string): Promise<void> {
|
||||
return fs.rm(metaFilePath);
|
||||
}
|
||||
|
||||
private async deleteFromLocalStorage(identifier: string) {
|
||||
return fs.rm(this.getBinaryPath(identifier));
|
||||
}
|
||||
|
||||
@@ -178,9 +178,9 @@ export class BinaryDataManager {
|
||||
}
|
||||
}
|
||||
|
||||
async deleteBinaryDataByExecutionId(executionId: string): Promise<void> {
|
||||
async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<void> {
|
||||
if (this.managers[this.binaryDataMode]) {
|
||||
await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionId(executionId);
|
||||
await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionIds(executionIds);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ export interface IBinaryDataManager {
|
||||
deleteMarkedFiles(): Promise<unknown>;
|
||||
deleteBinaryDataByIdentifier(identifier: string): Promise<void>;
|
||||
duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise<string>;
|
||||
deleteBinaryDataByExecutionId(executionId: string): Promise<void>;
|
||||
deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<string[]>;
|
||||
persistBinaryDataForExecutionId(executionId: string): Promise<void>;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user