feat(core): Switch binary filesystem mode to nested path structure (#7307)

Depends on #7253 | Story:
[PAY-863](https://linear.app/n8n/issue/PAY-863/switch-binary-filesystem-mode-to-nested-path-structure)

This PR introduces `filesystem-v2` to store binary data in the
filesystem in the same format as `s3`.
This commit is contained in:
Iván Ovejero
2023-10-10 10:06:06 +02:00
committed by GitHub
parent 86e7ec796a
commit 0847623f85
13 changed files with 326 additions and 186 deletions

View File

@@ -4,8 +4,8 @@ import { readFile, stat } from 'node:fs/promises';
import prettyBytes from 'pretty-bytes';
import Container, { Service } from 'typedi';
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors';
import { areValidModes, toBuffer } from './utils';
import { UnknownManagerError, InvalidModeError } from './errors';
import { areConfigModes, toBuffer } from './utils';
import { LogCatch } from '../decorators/LogCatch.decorator';
import type { Readable } from 'stream';
@@ -14,21 +14,20 @@ import type { INodeExecutionData } from 'n8n-workflow';
@Service()
export class BinaryDataService {
private mode: BinaryData.Mode = 'default';
private mode: BinaryData.ServiceMode = 'default';
private managers: Record<string, BinaryData.Manager> = {};
async init(config: BinaryData.Config) {
if (!areValidModes(config.availableModes)) {
throw new InvalidBinaryDataModeError();
}
if (!areConfigModes(config.availableModes)) throw new InvalidModeError();
this.mode = config.mode;
this.mode = config.mode === 'filesystem' ? 'filesystem-v2' : config.mode;
if (config.availableModes.includes('filesystem')) {
const { FileSystemManager } = await import('./FileSystem.manager');
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
this.managers['filesystem-v2'] = this.managers.filesystem;
await this.managers.filesystem.init();
}
@@ -200,9 +199,6 @@ export class BinaryDataService {
// private methods
// ----------------------------------
/**
* Create an identifier `${mode}:{fileId}` for `IBinaryData['id']`.
*/
private createBinaryDataId(fileId: string) {
return `${this.mode}:${fileId}`;
}
@@ -253,6 +249,6 @@ export class BinaryDataService {
if (manager) return manager;
throw new UnknownBinaryDataManagerError(mode);
throw new UnknownManagerError(mode);
}
}

View File

@@ -3,7 +3,7 @@ import fs from 'node:fs/promises';
import path from 'node:path';
import { v4 as uuid } from 'uuid';
import { jsonParse } from 'n8n-workflow';
import { ensureDirExists } from './utils';
import { assertDir } from './utils';
import { FileNotFoundError } from '../errors';
import type { Readable } from 'stream';
@@ -16,7 +16,7 @@ export class FileSystemManager implements BinaryData.Manager {
constructor(private storagePath: string) {}
async init() {
await ensureDirExists(this.storagePath);
await assertDir(this.storagePath);
}
async store(
@@ -28,6 +28,8 @@ export class FileSystemManager implements BinaryData.Manager {
const fileId = this.toFileId(workflowId, executionId);
const filePath = this.resolvePath(fileId);
await assertDir(path.dirname(filePath));
await fs.writeFile(filePath, bufferOrStream);
const fileSize = await this.getSize(fileId);
@@ -64,6 +66,10 @@ export class FileSystemManager implements BinaryData.Manager {
}
async deleteMany(ids: BinaryData.IdsForDeletion) {
if (ids.length === 0) return;
// binary files stored in single dir - `filesystem`
const executionIds = ids.map((o) => o.executionId);
const set = new Set(executionIds);
@@ -78,6 +84,18 @@ export class FileSystemManager implements BinaryData.Manager {
await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]);
}
}
// binary files stored in nested dirs - `filesystem-v2`
const binaryDataDirs = ids.map(({ workflowId, executionId }) =>
this.resolvePath(`workflows/${workflowId}/executions/${executionId}/binary_data/`),
);
await Promise.all(
binaryDataDirs.map(async (dir) => {
await fs.rm(dir, { recursive: true });
}),
);
}
async copyByFilePath(
@@ -89,6 +107,8 @@ export class FileSystemManager implements BinaryData.Manager {
const targetFileId = this.toFileId(workflowId, executionId);
const targetPath = this.resolvePath(targetFileId);
await assertDir(path.dirname(targetPath));
await fs.cp(sourcePath, targetPath);
const fileSize = await this.getSize(targetFileId);
@@ -103,6 +123,8 @@ export class FileSystemManager implements BinaryData.Manager {
const sourcePath = this.resolvePath(sourceFileId);
const targetPath = this.resolvePath(targetFileId);
await assertDir(path.dirname(targetPath));
await fs.copyFile(sourcePath, targetPath);
return targetFileId;
@@ -112,10 +134,17 @@ export class FileSystemManager implements BinaryData.Manager {
const oldPath = this.resolvePath(oldFileId);
const newPath = this.resolvePath(newFileId);
await assertDir(path.dirname(newPath));
await Promise.all([
fs.rename(oldPath, newPath),
fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`),
]);
const [tempDirParent] = oldPath.split('/temp/');
const tempDir = path.join(tempDirParent, 'temp');
await fs.rm(tempDir, { recursive: true });
}
// ----------------------------------
@@ -123,12 +152,15 @@ export class FileSystemManager implements BinaryData.Manager {
// ----------------------------------
/**
* @tech_debt The `workflowId` argument is for compatibility with the
* `BinaryData.Manager` interface. Unused here until we refactor
* how we store binary data files in the `/binaryData` dir.
* Generate an ID for a binary data file.
*
* The legacy ID format `{executionId}{uuid}` for `filesystem` mode is
* no longer used on write, only when reading old stored execution data.
*/
private toFileId(_workflowId: string, executionId: string) {
return [executionId, uuid()].join('');
private toFileId(workflowId: string, executionId: string) {
if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244
return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`;
}
private resolvePath(...args: string[]) {

View File

@@ -1,10 +1,10 @@
import { BINARY_DATA_MODES } from './utils';
import { CONFIG_MODES } from './utils';
export class InvalidBinaryDataModeError extends Error {
message = `Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`;
export class InvalidModeError extends Error {
message = `Invalid binary data mode. Valid modes: ${CONFIG_MODES.join(', ')}`;
}
export class UnknownBinaryDataManagerError extends Error {
export class UnknownManagerError extends Error {
constructor(mode: string) {
super(`No binary data manager found for: ${mode}`);
}

View File

@@ -1,14 +1,30 @@
import type { Readable } from 'stream';
import type { BINARY_DATA_MODES } from './utils';
export namespace BinaryData {
export type Mode = (typeof BINARY_DATA_MODES)[number];
type LegacyMode = 'filesystem';
export type NonDefaultMode = Exclude<Mode, 'default'>;
type UpgradedMode = 'filesystem-v2';
/**
* Binary data mode selectable by user via env var config.
*/
export type ConfigMode = 'default' | 'filesystem' | 's3';
/**
* Binary data mode used internally by binary data service. User-selected
* legacy modes are replaced with upgraded modes.
*/
export type ServiceMode = Exclude<ConfigMode, LegacyMode> | UpgradedMode;
/**
* Binary data mode in binary data ID in stored execution data. Both legacy
* and upgraded modes may be present, except default in-memory mode.
*/
export type StoredMode = Exclude<ConfigMode | UpgradedMode, 'default'>;
export type Config = {
mode: Mode;
availableModes: string[];
mode: ConfigMode;
availableModes: ConfigMode[];
localStoragePath: string;
};

View File

@@ -3,23 +3,19 @@ import type { Readable } from 'node:stream';
import type { BinaryData } from './types';
import concatStream from 'concat-stream';
/**
* Modes for storing binary data:
* - `default` (in memory)
* - `filesystem` (on disk)
* - `s3` (S3-compatible storage)
*/
export const BINARY_DATA_MODES = ['default', 'filesystem', 's3'] as const;
export const CONFIG_MODES = ['default', 'filesystem', 's3'] as const;
export function areValidModes(modes: string[]): modes is BinaryData.Mode[] {
return modes.every((m) => BINARY_DATA_MODES.includes(m as BinaryData.Mode));
const STORED_MODES = ['filesystem', 'filesystem-v2', 's3'] as const;
export function areConfigModes(modes: string[]): modes is BinaryData.ConfigMode[] {
return modes.every((m) => CONFIG_MODES.includes(m as BinaryData.ConfigMode));
}
export function isValidNonDefaultMode(mode: string): mode is BinaryData.NonDefaultMode {
return BINARY_DATA_MODES.filter((m) => m !== 'default').includes(mode as BinaryData.Mode);
export function isStoredMode(mode: string): mode is BinaryData.StoredMode {
return STORED_MODES.includes(mode as BinaryData.StoredMode);
}
export async function ensureDirExists(dir: string) {
export async function assertDir(dir: string) {
try {
await fs.access(dir);
} catch {

View File

@@ -18,4 +18,4 @@ export { NodeExecuteFunctions, UserSettings };
export * from './errors';
export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
export { BinaryData } from './BinaryData/types';
export { isValidNonDefaultMode } from './BinaryData/utils';
export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils';

View File

@@ -0,0 +1,172 @@
import path from 'node:path';
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { FileSystemManager } from '@/BinaryData/FileSystem.manager';
import { isStream } from '@/ObjectStore/utils';
import { toFileId, toStream } from './utils';
jest.mock('fs');
jest.mock('fs/promises');
const storagePath = tmpdir();
const fsManager = new FileSystemManager(storagePath);
const toFullFilePath = (fileId: string) => path.join(storagePath, fileId);
const workflowId = 'ObogjVbqpNOQpiyV';
const executionId = '999';
const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32';
const fileId = toFileId(workflowId, executionId, fileUuid);
const otherWorkflowId = 'FHio8ftV6SrCAfPJ';
const otherExecutionId = '888';
const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33';
const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid);
const mockBuffer = Buffer.from('Test data');
const mockStream = toStream(mockBuffer);
afterAll(() => {
jest.restoreAllMocks();
});
describe('store()', () => {
it('should store a buffer', async () => {
const metadata = { mimeType: 'text/plain' };
const result = await fsManager.store(workflowId, executionId, mockBuffer, metadata);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('getPath()', () => {
it('should return a path', async () => {
const filePath = fsManager.getPath(fileId);
expect(filePath).toBe(toFullFilePath(fileId));
});
});
describe('getAsBuffer()', () => {
it('should return a buffer', async () => {
fsp.readFile = jest.fn().mockResolvedValue(mockBuffer);
const result = await fsManager.getAsBuffer(fileId);
expect(Buffer.isBuffer(result)).toBe(true);
expect(fsp.readFile).toHaveBeenCalledWith(toFullFilePath(fileId));
});
});
describe('getAsStream()', () => {
it('should return a stream', async () => {
fs.createReadStream = jest.fn().mockReturnValue(mockStream);
const stream = await fsManager.getAsStream(fileId);
expect(isStream(stream)).toBe(true);
expect(fs.createReadStream).toHaveBeenCalledWith(toFullFilePath(fileId), {
highWaterMark: undefined,
});
});
});
describe('getMetadata()', () => {
it('should return metadata', async () => {
const mimeType = 'text/plain';
const fileName = 'file.txt';
fsp.readFile = jest.fn().mockResolvedValue(
JSON.stringify({
fileSize: 1,
mimeType,
fileName,
}),
);
const metadata = await fsManager.getMetadata(fileId);
expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName }));
});
});
describe('copyByFileId()', () => {
it('should copy by file ID and return the file ID', async () => {
fsp.copyFile = jest.fn().mockResolvedValue(undefined);
// @ts-expect-error - private method
jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId);
const targetFileId = await fsManager.copyByFileId(workflowId, executionId, fileId);
const sourcePath = toFullFilePath(fileId);
const targetPath = toFullFilePath(targetFileId);
expect(fsp.copyFile).toHaveBeenCalledWith(sourcePath, targetPath);
});
});
describe('copyByFilePath()', () => {
test('should copy by file path and return the file ID and size', async () => {
const sourceFilePath = tmpdir();
const metadata = { mimeType: 'text/plain' };
// @ts-expect-error - private method
jest.spyOn(fsManager, 'toFileId').mockReturnValue(otherFileId);
// @ts-expect-error - private method
jest.spyOn(fsManager, 'getSize').mockReturnValue(mockBuffer.length);
const targetPath = toFullFilePath(otherFileId);
fsp.cp = jest.fn().mockResolvedValue(undefined);
const result = await fsManager.copyByFilePath(
workflowId,
executionId,
sourceFilePath,
metadata,
);
expect(fsp.cp).toHaveBeenCalledWith(sourceFilePath, targetPath);
expect(result.fileSize).toBe(mockBuffer.length);
});
});
describe('deleteMany()', () => {
it('should delete many files by workflow ID and execution ID', async () => {
const ids = [
{ workflowId, executionId },
{ workflowId: otherWorkflowId, executionId: otherExecutionId },
];
fsp.rm = jest.fn().mockResolvedValue(undefined);
const promise = fsManager.deleteMany(ids);
await expect(promise).resolves.not.toThrow();
expect(fsp.rm).toHaveBeenCalledTimes(2);
});
});
describe('rename()', () => {
it('should rename a file', async () => {
fsp.rename = jest.fn().mockResolvedValue(undefined);
fsp.rm = jest.fn().mockResolvedValue(undefined);
const promise = fsManager.rename(fileId, otherFileId);
const oldPath = toFullFilePath(fileId);
const newPath = toFullFilePath(otherFileId);
await expect(promise).resolves.not.toThrow();
expect(fsp.rename).toHaveBeenCalledTimes(2);
expect(fsp.rename).toHaveBeenCalledWith(oldPath, newPath);
expect(fsp.rename).toHaveBeenCalledWith(`${oldPath}.metadata`, `${newPath}.metadata`);
});
});

View File

@@ -102,12 +102,12 @@ describe('NodeExecuteFunctions', () => {
);
// Expect our return object to contain the name of the configured data manager.
expect(setBinaryDataBufferResponse.data).toEqual('filesystem');
expect(setBinaryDataBufferResponse.data).toEqual('filesystem-v2');
// Ensure that the input data was successfully persisted to disk.
expect(
readFileSync(
`${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem:', '')}`,
`${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem-v2:', '')}`,
),
).toEqual(inputData);

View File

@@ -2,16 +2,13 @@ import fs from 'node:fs/promises';
import { ObjectStoreManager } from '@/BinaryData/ObjectStore.manager';
import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee';
import { isStream } from '@/ObjectStore/utils';
import { mockInstance, toStream } from './utils';
import { mockInstance, toFileId, toStream } from './utils';
jest.mock('fs/promises');
const objectStoreService = mockInstance(ObjectStoreService);
const objectStoreManager = new ObjectStoreManager(objectStoreService);
const toFileId = (workflowId: string, executionId: string, fileUuid: string) =>
`workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`;
const workflowId = 'ObogjVbqpNOQpiyV';
const executionId = '999';
const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32';

View File

@@ -20,3 +20,6 @@ export function toStream(buffer: Buffer) {
return duplexStream;
}
export const toFileId = (workflowId: string, executionId: string, fileUuid: string) =>
`workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`;