feat(core): Make Redis available for backend communication (#6719)

* support redis cluster

* cleanup, fix config schema

* set default prefix to bull

* initial commit

* improve logging

* improve types and refactor

* list support and refactor

* fix redis service and tests

* add comment

* add redis and cache prefix

* use injection

* lint fix

* clean schema comments

* improve naming, tests, cluster client

* merge master

* cache returns unknown instead of T

* update cache service, tests and doc

* remove console.log

* do not cache null or undefined values

* fix merge

* lint fix
This commit is contained in:
Michael Auerswald
2023-08-02 12:51:25 +02:00
committed by GitHub
parent 4ac4b850dd
commit 3cad60e918
20 changed files with 1377 additions and 322 deletions

View File

@@ -3,9 +3,8 @@ import config from '@/config';
import { caching } from 'cache-manager';
import type { MemoryCache } from 'cache-manager';
import type { RedisCache } from 'cache-manager-ioredis-yet';
import type { RedisOptions } from 'ioredis';
import { getRedisClusterNodes } from '../GenericHelpers';
import { LoggerProxy, jsonStringify } from 'n8n-workflow';
import { jsonStringify } from 'n8n-workflow';
import { getDefaultRedisClient, getRedisPrefix } from './redis/RedisServiceHelper';
@Service()
export class CacheService {
@@ -15,80 +14,33 @@ export class CacheService {
*/
private cache: RedisCache | MemoryCache | undefined;
async init() {
isRedisCache(): boolean {
return (this.cache as RedisCache)?.store?.isCacheable !== undefined;
}
/**
* Initialize the cache service.
*
* If the cache is enabled, it will initialize the cache from the provided config options. By default, it will use
* the `memory` backend and create a simple in-memory cache. If running in `queue` mode, or if `redis` backend is selected,
* it use Redis as the cache backend (either a local Redis instance or a Redis cluster, depending on the config)
*
* If the cache is disabled, this does nothing.
*/
async init(): Promise<void> {
if (!config.getEnv('cache.enabled')) {
throw new Error('Cache is disabled');
return;
}
const backend = config.getEnv('cache.backend');
if (
backend === 'redis' ||
(backend === 'auto' && config.getEnv('executions.mode') === 'queue')
) {
const { redisInsStore } = await import('cache-manager-ioredis-yet');
// #region TEMPORARY Redis Client Code
/*
* TODO: remove once redis service is ready
* this code is just temporary
*/
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
let lastTimer = 0;
let cumulativeTimeout = 0;
const { host, port, username, password, db }: RedisOptions =
config.getEnv('queue.bull.redis');
const clusterNodes = getRedisClusterNodes();
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
const usesRedisCluster = clusterNodes.length > 0;
LoggerProxy.debug(
usesRedisCluster
? `(Cache Service) Initialising Redis cluster connection with nodes: ${clusterNodes
.map((e) => `${e.host}:${e.port}`)
.join(',')}`
: `(Cache Service) Initialising Redis client connection with host: ${
host ?? 'localhost'
} and port: ${port ?? '6379'}`,
);
const sharedRedisOptions: RedisOptions = {
username,
password,
db,
enableReadyCheck: false,
maxRetriesPerRequest: null,
};
const redisClient = usesRedisCluster
? new Redis.Cluster(
clusterNodes.map((node) => ({ host: node.host, port: node.port })),
{
redisOptions: sharedRedisOptions,
},
)
: new Redis({
host,
port,
...sharedRedisOptions,
retryStrategy: (): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
LoggerProxy.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
});
// #endregion TEMPORARY Redis Client Code
const redisPrefix = getRedisPrefix(config.getEnv('redis.prefix'));
const cachePrefix = config.getEnv('cache.redis.prefix');
const keyPrefix = `${redisPrefix}:${cachePrefix}:`;
const redisClient = await getDefaultRedisClient({ keyPrefix }, 'client(cache)');
const redisStore = redisInsStore(redisClient, {
ttl: config.getEnv('cache.redis.ttl'),
});
@@ -106,6 +58,163 @@ export class CacheService {
}
}
/**
* Get a value from the cache by key.
*
* If the value is not in the cache or expired, the refreshFunction is called if defined,
* which will set the key with the function's result and returns it. If no refreshFunction is set, the fallback value is returned.
*
* If the cache is disabled, refreshFunction's result or fallbackValue is returned.
*
* If cache is not hit, and neither refreshFunction nor fallbackValue are provided, `undefined` is returned.
* @param key The key to fetch from the cache
* @param options.refreshFunction Optional function to call to set the cache if the key is not found
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
*/
async get(
key: string,
options: {
fallbackValue?: unknown;
refreshFunction?: (key: string) => Promise<unknown>;
refreshTtl?: number;
} = {},
): Promise<unknown> {
const value = await this.cache?.store.get(key);
if (value !== undefined) {
return value;
}
if (options.refreshFunction) {
const refreshValue = await options.refreshFunction(key);
await this.set(key, refreshValue, options.refreshTtl);
return refreshValue;
}
return options.fallbackValue ?? undefined;
}
/**
* Get many values from a list of keys.
*
* If a value is not in the cache or expired, the returned list will have `undefined` at that index.
* If the cache is disabled, refreshFunction's result or fallbackValue is returned.
* If cache is not hit, and neither refreshFunction nor fallbackValue are provided, a list of `undefined` is returned.
* @param keys A list of keys to fetch from the cache
* @param options.refreshFunctionEach Optional, if defined, undefined values will be replaced with the result of the refreshFunctionEach call and the cache will be updated
* @param options.refreshFunctionMany Optional, if defined, all values will be replaced with the result of the refreshFunctionMany call and the cache will be updated
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
*/
async getMany(
keys: string[],
options: {
fallbackValues?: unknown[];
refreshFunctionEach?: (key: string) => Promise<unknown>;
refreshFunctionMany?: (keys: string[]) => Promise<unknown[]>;
refreshTtl?: number;
} = {},
): Promise<unknown[]> {
let values = await this.cache?.store.mget(...keys);
if (values === undefined) {
values = keys.map(() => undefined);
}
if (!values.includes(undefined)) {
return values;
}
if (options.refreshFunctionEach) {
for (let i = 0; i < keys.length; i++) {
if (values[i] === undefined) {
const key = keys[i];
let fallback = undefined;
if (options.fallbackValues && options.fallbackValues.length > i) {
fallback = options.fallbackValues[i];
}
const refreshValue = await this.get(key, {
refreshFunction: options.refreshFunctionEach,
refreshTtl: options.refreshTtl,
fallbackValue: fallback,
});
values[i] = refreshValue;
}
}
return values;
}
if (options.refreshFunctionMany) {
const refreshValues: unknown[] = await options.refreshFunctionMany(keys);
if (keys.length !== refreshValues.length) {
throw new Error('refreshFunctionMany must return the same number of values as keys');
}
const newKV: Array<[string, unknown]> = [];
for (let i = 0; i < keys.length; i++) {
newKV.push([keys[i], refreshValues[i]]);
}
await this.setMany(newKV, options.refreshTtl);
return refreshValues;
}
return options.fallbackValues ?? values;
}
/**
* Set a value in the cache by key.
* @param key The key to set
* @param value The value to set
* @param ttl Optional time to live in ms
*/
async set(key: string, value: unknown, ttl?: number): Promise<void> {
if (!this.cache) {
await this.init();
}
if (value === undefined || value === null) {
return;
}
if (this.isRedisCache()) {
if (!(this.cache as RedisCache)?.store?.isCacheable(value)) {
throw new Error('Value is not cacheable');
}
}
await this.cache?.store.set(key, value, ttl);
}
/**
* Set a multiple values in the cache at once.
* @param values An array of [key, value] tuples to set
* @param ttl Optional time to live in ms
*/
async setMany(values: Array<[string, unknown]>, ttl?: number): Promise<void> {
if (!this.cache) {
await this.init();
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const nonNullValues = values.filter(([_key, value]) => value !== undefined && value !== null);
if (this.isRedisCache()) {
// eslint-disable-next-line @typescript-eslint/naming-convention
nonNullValues.forEach(([_key, value]) => {
if (!(this.cache as RedisCache)?.store?.isCacheable(value)) {
throw new Error('Value is not cacheable');
}
});
}
await this.cache?.store.mset(nonNullValues, ttl);
}
/**
* Delete a value from the cache by key.
* @param key The key to delete
*/
async delete(key: string): Promise<void> {
await this.cache?.store.del(key);
}
/**
* Delete multiple values from the cache.
* @param keys List of keys to delete
*/
async deleteMany(keys: string[]): Promise<void> {
return this.cache?.store.mdel(...keys);
}
/**
* Delete all values and uninitialized the cache.
*/
async destroy() {
if (this.cache) {
await this.reset();
@@ -113,6 +222,22 @@ export class CacheService {
}
}
/**
* Enable and initialize the cache.
*/
async enable() {
config.set('cache.enabled', true);
await this.init();
}
/**
* Disable and destroy the cache.
*/
async disable() {
config.set('cache.enabled', false);
await this.destroy();
}
async getCache(): Promise<RedisCache | MemoryCache | undefined> {
if (!this.cache) {
await this.init();
@@ -120,59 +245,35 @@ export class CacheService {
return this.cache;
}
async get<T>(key: string): Promise<T> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.get(key) as T;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.set(key, value, ttl);
}
async delete(key: string): Promise<void> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.del(key);
}
/**
* Delete all values from the cache, but leave the cache initialized.
*/
async reset(): Promise<void> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.reset();
await this.cache?.store.reset();
}
/**
* Return all keys in the cache.
*/
async keys(): Promise<string[]> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.keys() ?? [];
}
async setMany<T>(values: Array<[string, T]>, ttl?: number): Promise<void> {
if (!this.cache) {
await this.init();
/**
* Return all key/value pairs in the cache. This is a potentially very expensive operation and is only meant to be used for debugging
*/
async keyValues(): Promise<Map<string, unknown>> {
const keys = await this.keys();
const values = await this.getMany(keys);
const map = new Map<string, unknown>();
if (keys.length === values.length) {
for (let i = 0; i < keys.length; i++) {
map.set(keys[i], values[i]);
}
return map;
}
return this.cache?.store.mset(values, ttl);
}
async getMany<T>(keys: string[]): Promise<Array<[string, T]>> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.mget(...keys) as Promise<Array<[string, T]>>;
}
async deleteMany(keys: string[]): Promise<void> {
if (!this.cache) {
await this.init();
}
return this.cache?.store.mdel(...keys);
throw new Error(
'Keys and values do not match, this should not happen and appears to result from some cache corruption.',
);
}
}