refactor(core): Centralize SSH Tunnel management (#9906)

Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-07-04 12:29:44 +02:00
committed by GitHub
parent 86018aa6e0
commit 85aa560a5d
25 changed files with 525 additions and 630 deletions

View File

@@ -6,8 +6,9 @@ import type {
INodeListSearchResult,
INodeListSearchItems,
} from 'n8n-workflow';
import pgPromise from 'pg-promise';
import type pg from 'pg-promise/typescript/pg-subset';
import type { PgpDatabase, PostgresNodeCredentials } from './v2/helpers/interfaces';
import { configurePostgres } from './v2/transport';
export function prepareNames(id: string, mode: string, additionalFields: IDataObject) {
let suffix = id.replace(/-/g, '_');
@@ -35,7 +36,7 @@ export function prepareNames(id: string, mode: string, additionalFields: IDataOb
export async function pgTriggerFunction(
this: ITriggerFunctions,
db: pgPromise.IDatabase<{}, pg.IClient>,
db: PgpDatabase,
additionalFields: IDataObject,
functionName: string,
triggerName: string,
@@ -86,43 +87,12 @@ export async function pgTriggerFunction(
}
export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = this.getNodeParameter('options', {}) as {
connectionTimeout?: number;
delayClosingIdleConnection?: number;
};
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
noWarnings: true,
});
const config: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
keepAlive: true,
};
if (options.connectionTimeout) {
config.connectionTimeoutMillis = options.connectionTimeout * 1000;
}
if (options.delayClosingIdleConnection) {
config.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000;
}
if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,
};
} else {
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
config.sslmode = (credentials.ssl as string) || 'disable';
}
const db = pgp(config);
return { db, pgp };
return await configurePostgres.call(this, credentials, options);
}
export async function searchSchema(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {

View File

@@ -21,7 +21,7 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
options.nodeVersion = node.typeVersion;
options.operation = operation;
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const { db, pgp } = await configurePostgres.call(this, credentials, options);
const runQueries = configureQueryRunner.call(
this,
@@ -53,13 +53,7 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
`The operation "${operation}" is not supported!`,
);
}
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
if (!db.$pool.ending) await db.$pool.end();
}

View File

@@ -1,7 +1,6 @@
import type { IDataObject, INodeExecutionData } from 'n8n-workflow';
import type { IDataObject, INodeExecutionData, SSHCredentials } from 'n8n-workflow';
import type pgPromise from 'pg-promise';
import type pg from 'pg-promise/typescript/pg-subset';
import type { Client } from 'ssh2';
export type QueryMode = 'single' | 'transaction' | 'independently';
@@ -28,7 +27,7 @@ export type EnumInfo = {
export type PgpClient = pgPromise.IMain<{}, pg.IClient>;
export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>;
export type PgpConnectionParameters = pg.IConnectionParameters<pg.IClient>;
export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient; sshClient?: Client };
export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient };
export type QueriesRunner = (
queries: QueryWithValues[],
@@ -51,7 +50,6 @@ export type PostgresNodeOptions = {
};
export type PostgresNodeCredentials = {
sshAuthenticateWith: 'password' | 'privateKey';
host: string;
port: number;
database: string;
@@ -59,12 +57,9 @@ export type PostgresNodeCredentials = {
password: string;
allowUnauthorizedCerts?: boolean;
ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full';
sshTunnel?: boolean;
sshHost?: string;
sshPort?: number;
sshPostgresPort?: number;
sshUser?: string;
sshPassword?: string;
privateKey?: string;
passphrase?: string;
};
} & (
| { sshTunnel: false }
| ({
sshTunnel: true;
} & SSHCredentials)
);

View File

@@ -4,7 +4,6 @@ import type {
INodeCredentialTestResult,
} from 'n8n-workflow';
import { Client } from 'ssh2';
import { configurePostgres } from '../transport';
import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces';
@@ -15,13 +14,11 @@ export async function postgresConnectionTest(
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as PostgresNodeCredentials;
let sshClientCreated: Client | undefined = new Client();
let pgpClientCreated: PgpClient | undefined;
try {
const { db, pgp, sshClient } = await configurePostgres(credentials, {}, sshClientCreated);
const { db, pgp } = await configurePostgres.call(this, credentials, {});
sshClientCreated = sshClient;
pgpClientCreated = pgp;
await db.connect();
@@ -45,9 +42,6 @@ export async function postgresConnectionTest(
message,
};
} finally {
if (sshClientCreated) {
sshClientCreated.end();
}
if (pgpClientCreated) {
pgpClientCreated.end();
}

View File

@@ -7,7 +7,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);
const { db } = await configurePostgres.call(this, credentials, options);
try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
@@ -18,12 +18,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
value: schema.schema_name as string,
})),
};
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
if (!db.$pool.ending) await db.$pool.end();
}
}
@@ -31,7 +26,7 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);
const { db } = await configurePostgres.call(this, credentials, options);
const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
@@ -49,12 +44,7 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
value: table.table_name as string,
})),
};
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
if (!db.$pool.ending) await db.$pool.end();
}
}

View File

@@ -8,7 +8,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };
const { db, sshClient } = await configurePostgres(credentials, options);
const { db } = await configurePostgres.call(this, credentials, options);
const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
@@ -26,12 +26,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
value: column.column_name,
description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
}));
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
if (!db.$pool.ending) await db.$pool.end();
}
}

View File

@@ -48,7 +48,7 @@ export async function getMappingColumns(
): Promise<ResourceMapperFields> {
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const { db, sshClient } = await configurePostgres(credentials);
const { db } = await configurePostgres.call(this, credentials);
const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
@@ -89,12 +89,7 @@ export async function getMappingColumns(
}),
);
return { fields };
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
if (!db.$pool.ending) await db.$pool.end();
}
}

View File

@@ -1,72 +1,26 @@
import type { Server } from 'net';
import { createServer } from 'net';
import { Client } from 'ssh2';
import type { ConnectConfig } from 'ssh2';
import type { IDataObject } from 'n8n-workflow';
import { createServer, type AddressInfo } from 'node:net';
import pgPromise from 'pg-promise';
import type {
PgpDatabase,
IExecuteFunctions,
ICredentialTestFunctions,
ILoadOptionsFunctions,
ITriggerFunctions,
} from 'n8n-workflow';
import { formatPrivateKey } from '@utils/utilities';
import type {
ConnectionsData,
PgpConnectionParameters,
PostgresNodeCredentials,
PostgresNodeOptions,
} from '../helpers/interfaces';
import { formatPrivateKey } from '@utils/utilities';
import { LOCALHOST } from '@utils/constants';
async function createSshConnectConfig(credentials: PostgresNodeCredentials) {
if (credentials.sshAuthenticateWith === 'password') {
return {
host: credentials.sshHost as string,
port: credentials.sshPort as number,
username: credentials.sshUser as string,
password: credentials.sshPassword as string,
} as ConnectConfig;
} else {
const options: ConnectConfig = {
host: credentials.sshHost as string,
username: credentials.sshUser as string,
port: credentials.sshPort as number,
privateKey: formatPrivateKey(credentials.privateKey as string),
};
if (credentials.passphrase) {
options.passphrase = credentials.passphrase;
}
return options;
}
}
export async function configurePostgres(
const getPostgresConfig = (
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
createdSshClient?: Client,
) {
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed imidiatly after, but several could be open at the same time
noWarnings: true,
});
if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
[pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => {
pgp.pg.types.setTypeParser(type, (value: string) => {
return new Date(value).toISOString();
});
});
}
if (options.largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const dbConfig: IDataObject = {
) => {
const dbConfig: PgpConnectionParameters = {
host: credentials.host,
port: credentials.port,
database: credentials.database,
@@ -89,70 +43,91 @@ export async function configurePostgres(
};
} else {
dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
// @ts-ignore these typings need to be updated
dbConfig.sslmode = credentials.ssl || 'disable';
}
return dbConfig;
};
export async function configurePostgres(
this: IExecuteFunctions | ICredentialTestFunctions | ILoadOptionsFunctions | ITriggerFunctions,
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
): Promise<ConnectionsData> {
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time
noWarnings: true,
});
if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
[pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => {
pgp.pg.types.setTypeParser(type, (value: string) => {
return new Date(value).toISOString();
});
});
}
if (options.largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const dbConfig = getPostgresConfig(credentials, options);
if (!credentials.sshTunnel) {
const db = pgp(dbConfig);
return { db, pgp };
} else {
const sshClient = createdSshClient || new Client();
if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) {
credentials.privateKey = formatPrivateKey(credentials.privateKey);
}
const sshClient = await this.helpers.getSSHClient(credentials);
const tunnelConfig = await createSshConnectConfig(credentials);
// Create a TCP proxy listening on a random available port
const proxy = createServer();
const proxyPort = await new Promise<number>((resolve) => {
proxy.listen(0, LOCALHOST, () => {
resolve((proxy.address() as AddressInfo).port);
});
});
const localHost = '127.0.0.1';
const localPort = credentials.sshPostgresPort as number;
let proxy: Server | undefined;
const db = await new Promise<PgpDatabase>((resolve, reject) => {
let sshClientReady = false;
proxy = createServer((socket) => {
if (!sshClientReady) return socket.destroy();
const close = () => {
proxy.close();
sshClient.off('end', close);
sshClient.off('error', close);
};
sshClient.on('end', close);
sshClient.on('error', close);
await new Promise<void>((resolve, reject) => {
proxy.on('error', (err) => reject(err));
proxy.on('connection', (localSocket) => {
sshClient.forwardOut(
socket.remoteAddress as string,
socket.remotePort as number,
LOCALHOST,
localSocket.remotePort!,
credentials.host,
credentials.port,
(err, stream) => {
if (err) reject(err);
socket.pipe(stream);
stream.pipe(socket);
(err, clientChannel) => {
if (err) {
proxy.close();
localSocket.destroy();
} else {
localSocket.pipe(clientChannel);
clientChannel.pipe(localSocket);
}
},
);
}).listen(localPort, localHost);
proxy.on('error', (err) => {
reject(err);
});
sshClient.connect(tunnelConfig);
sshClient.on('ready', () => {
sshClientReady = true;
const updatedDbConfig = {
...dbConfig,
port: localPort,
host: localHost,
};
const dbConnection = pgp(updatedDbConfig);
resolve(dbConnection);
});
sshClient.on('error', (err) => {
reject(err);
});
sshClient.on('end', async () => {
if (proxy) proxy.close();
});
resolve();
}).catch((err) => {
if (proxy) proxy.close();
if (sshClient) sshClient.end();
proxy.close();
let message = err.message;
let description = err.description;
@@ -183,6 +158,11 @@ export async function configurePostgres(
throw err;
});
return { db, pgp, sshClient };
const db = pgp({
...dbConfig,
port: proxyPort,
host: LOCALHOST,
});
return { db, pgp };
}
}