feat(API): Report unhandled app crashes to Sentry (#4548)

* SIGTERM/SIGINT should only be handled once

* move error-handling initialization to commands

* create a new `sleep` function in workflow utils

* detect crashes and report them to Sentry
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2022-11-08 17:06:00 +01:00 committed by GitHub
parent 5d852f9230
commit 2425c10b2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 129 additions and 73 deletions

View file

@ -11,8 +11,7 @@ import { Command, flags } from '@oclif/command';
import { BinaryDataManager, UserSettings } from 'n8n-core'; import { BinaryDataManager, UserSettings } from 'n8n-core';
// eslint-disable-next-line @typescript-eslint/no-unused-vars import { ITaskData, LoggerProxy, sleep } from 'n8n-workflow';
import { INode, ITaskData, LoggerProxy } from 'n8n-workflow';
import { sep } from 'path'; import { sep } from 'path';
@ -147,9 +146,7 @@ export class ExecuteBatch extends Command {
}); });
} }
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => { await sleep(500);
setTimeout(resolve, 500);
});
executingWorkflows = activeExecutionsInstance.getActiveExecutions(); executingWorkflows = activeExecutionsInstance.getActiveExecutions();
} }
// We may receive true but when called from `process.on` // We may receive true but when called from `process.on`
@ -192,8 +189,8 @@ export class ExecuteBatch extends Command {
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() { async run() {
process.on('SIGTERM', ExecuteBatch.stopProcess); process.once('SIGTERM', ExecuteBatch.stopProcess);
process.on('SIGINT', ExecuteBatch.stopProcess); process.once('SIGINT', ExecuteBatch.stopProcess);
const logger = getLogger(); const logger = getLogger();
LoggerProxy.init(logger); LoggerProxy.init(logger);

View file

@ -12,7 +12,7 @@ import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies // eslint-disable-next-line import/no-extraneous-dependencies
import Redis from 'ioredis'; import Redis from 'ioredis';
import { IDataObject, LoggerProxy } from 'n8n-workflow'; import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import config from '../config'; import config from '../config';
import { import {
@ -34,6 +34,8 @@ import {
import { getLogger } from '../src/Logger'; import { getLogger } from '../src/Logger';
import { getAllInstalledPackages } from '../src/CommunityNodes/packageModel'; import { getAllInstalledPackages } from '../src/CommunityNodes/packageModel';
import { initErrorHandling } from '../src/ErrorReporting';
import * as CrashJournal from '../src/CrashJournal';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open'); const open = require('open');
@ -83,7 +85,7 @@ export class Start extends Command {
} }
/** /**
* Stoppes the n8n in a graceful way. * Stop n8n in a graceful way.
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
@ -91,6 +93,12 @@ export class Start extends Command {
static async stopProcess() { static async stopProcess() {
getLogger().info('\nStopping n8n...'); getLogger().info('\nStopping n8n...');
const exit = () => {
CrashJournal.cleanup().finally(() => {
process.exit(processExitCode);
});
};
try { try {
// Stop with trying to activate workflows that could not be activated // Stop with trying to activate workflows that could not be activated
activeWorkflowRunner?.removeAllQueuedWorkflowActivations(); activeWorkflowRunner?.removeAllQueuedWorkflowActivations();
@ -102,7 +110,7 @@ export class Start extends Command {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
console.log(`process exited after 30s`); console.log(`process exited after 30s`);
process.exit(processExitCode); exit();
}, 30000); }, 30000);
await InternalHooksManager.getInstance().onN8nStop(); await InternalHooksManager.getInstance().onN8nStop();
@ -136,22 +144,27 @@ export class Start extends Command {
}); });
} }
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => { await sleep(500);
setTimeout(resolve, 500);
});
executingWorkflows = activeExecutionsInstance.getActiveExecutions(); executingWorkflows = activeExecutionsInstance.getActiveExecutions();
} }
} catch (error) { } catch (error) {
console.error('There was an error shutting down n8n.', error); console.error('There was an error shutting down n8n.', error);
} }
process.exit(processExitCode); exit();
} }
async run() { async run() {
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Start.stopProcess); process.once('SIGTERM', Start.stopProcess);
process.on('SIGINT', Start.stopProcess); process.once('SIGINT', Start.stopProcess);
const logger = getLogger();
LoggerProxy.init(logger);
logger.info('Initializing n8n process');
initErrorHandling();
await CrashJournal.init();
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Start); const { flags } = this.parse(Start);
@ -159,10 +172,6 @@ export class Start extends Command {
// Wrap that the process does not close but we can still use async // Wrap that the process does not close but we can still use async
await (async () => { await (async () => {
try { try {
const logger = getLogger();
LoggerProxy.init(logger);
logger.info('Initializing n8n process');
// Start directly with the init of the database to improve startup time // Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error: Error) => { const startDbInitPromise = Db.init().catch((error: Error) => {
logger.error(`There was an error initializing DB: "${error.message}"`); logger.error(`There was an error initializing DB: "${error.message}"`);

View file

@ -9,7 +9,7 @@ import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies // eslint-disable-next-line import/no-extraneous-dependencies
import Redis from 'ioredis'; import Redis from 'ioredis';
import { IDataObject, LoggerProxy } from 'n8n-workflow'; import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import config from '../config'; import config from '../config';
import { import {
ActiveExecutions, ActiveExecutions,
@ -26,9 +26,11 @@ import {
} from '../src'; } from '../src';
import { getLogger } from '../src/Logger'; import { getLogger } from '../src/Logger';
import { initErrorHandling } from '../src/ErrorReporting';
import * as CrashJournal from '../src/CrashJournal';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined; let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
let processExistCode = 0; let processExitCode = 0;
export class Webhook extends Command { export class Webhook extends Command {
static description = 'Starts n8n webhook process. Intercepts only production URLs.'; static description = 'Starts n8n webhook process. Intercepts only production URLs.';
@ -40,7 +42,7 @@ export class Webhook extends Command {
}; };
/** /**
* Stops the n8n in a graceful way. * Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
@ -48,6 +50,12 @@ export class Webhook extends Command {
static async stopProcess() { static async stopProcess() {
LoggerProxy.info(`\nStopping n8n...`); LoggerProxy.info(`\nStopping n8n...`);
const exit = () => {
CrashJournal.cleanup().finally(() => {
process.exit(processExitCode);
});
};
try { try {
const externalHooks = ExternalHooks(); const externalHooks = ExternalHooks();
await externalHooks.run('n8n.stop', []); await externalHooks.run('n8n.stop', []);
@ -55,7 +63,7 @@ export class Webhook extends Command {
setTimeout(() => { setTimeout(() => {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
process.exit(processExistCode); exit();
}, 30000); }, 30000);
// Wait for active workflow executions to finish // Wait for active workflow executions to finish
@ -70,16 +78,14 @@ export class Webhook extends Command {
); );
} }
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => { await sleep(500);
setTimeout(resolve, 500);
});
executingWorkflows = activeExecutionsInstance.getActiveExecutions(); executingWorkflows = activeExecutionsInstance.getActiveExecutions();
} }
} catch (error) { } catch (error) {
LoggerProxy.error('There was an error shutting down n8n.', error); LoggerProxy.error('There was an error shutting down n8n.', error);
} }
process.exit(processExistCode); exit();
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
@ -88,8 +94,11 @@ export class Webhook extends Command {
LoggerProxy.init(logger); LoggerProxy.init(logger);
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Webhook.stopProcess); process.once('SIGTERM', Webhook.stopProcess);
process.on('SIGINT', Webhook.stopProcess); process.once('SIGINT', Webhook.stopProcess);
initErrorHandling();
await CrashJournal.init();
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow
const { flags } = this.parse(Webhook); const { flags } = this.parse(Webhook);
@ -118,7 +127,7 @@ export class Webhook extends Command {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access
logger.error(`There was an error initializing DB: "${error.message}"`); logger.error(`There was an error initializing DB: "${error.message}"`);
processExistCode = 1; processExitCode = 1;
// @ts-ignore // @ts-ignore
process.emit('SIGINT'); process.emit('SIGINT');
process.exit(1); process.exit(1);
@ -230,7 +239,7 @@ export class Webhook extends Command {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
logger.error(`Webhook process cannot continue. "${error.message}"`); logger.error(`Webhook process cannot continue. "${error.message}"`);
processExistCode = 1; processExitCode = 1;
// @ts-ignore // @ts-ignore
process.emit('SIGINT'); process.emit('SIGINT');
process.exit(1); process.exit(1);

View file

@ -15,7 +15,14 @@ import PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command'; import { Command, flags } from '@oclif/command';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; import {
IExecuteResponsePromiseData,
INodeTypes,
IRun,
Workflow,
LoggerProxy,
sleep,
} from 'n8n-workflow';
import { FindOneOptions, getConnectionManager } from 'typeorm'; import { FindOneOptions, getConnectionManager } from 'typeorm';
@ -62,11 +69,11 @@ export class Worker extends Command {
static jobQueue: Queue.JobQueue; static jobQueue: Queue.JobQueue;
static processExistCode = 0; static processExitCode = 0;
// static activeExecutions = ActiveExecutions.getInstance(); // static activeExecutions = ActiveExecutions.getInstance();
/** /**
* Stoppes the n8n in a graceful way. * Stop n8n in a graceful way.
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
@ -88,7 +95,7 @@ export class Worker extends Command {
setTimeout(() => { setTimeout(() => {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
process.exit(Worker.processExistCode); process.exit(Worker.processExitCode);
}, maxStopTime); }, maxStopTime);
// Wait for active workflow executions to finish // Wait for active workflow executions to finish
@ -103,15 +110,13 @@ export class Worker extends Command {
); );
} }
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => { await sleep(500);
setTimeout(resolve, 500);
});
} }
} catch (error) { } catch (error) {
LoggerProxy.error('There was an error shutting down n8n.', error); LoggerProxy.error('There was an error shutting down n8n.', error);
} }
process.exit(Worker.processExistCode); process.exit(Worker.processExitCode);
} }
async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> { async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> {
@ -258,8 +263,8 @@ export class Worker extends Command {
console.info('Starting n8n worker...'); console.info('Starting n8n worker...');
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Worker.stopProcess); process.once('SIGTERM', Worker.stopProcess);
process.on('SIGINT', Worker.stopProcess); process.once('SIGINT', Worker.stopProcess);
// Wrap that the process does not close but we can still use async // Wrap that the process does not close but we can still use async
await (async () => { await (async () => {
@ -270,7 +275,7 @@ export class Worker extends Command {
const startDbInitPromise = Db.init().catch((error) => { const startDbInitPromise = Db.init().catch((error) => {
logger.error(`There was an error initializing DB: "${error.message}"`); logger.error(`There was an error initializing DB: "${error.message}"`);
Worker.processExistCode = 1; Worker.processExitCode = 1;
// @ts-ignore // @ts-ignore
process.emit('SIGINT'); process.emit('SIGINT');
process.exit(1); process.exit(1);
@ -441,7 +446,7 @@ export class Worker extends Command {
} catch (error) { } catch (error) {
logger.error(`Worker process cannot continue. "${error.message}"`); logger.error(`Worker process cannot continue. "${error.message}"`);
Worker.processExistCode = 1; Worker.processExitCode = 1;
// @ts-ignore // @ts-ignore
process.emit('SIGINT'); process.emit('SIGINT');
process.exit(1); process.exit(1);

View file

@ -0,0 +1,33 @@
import { existsSync } from 'fs';
import { mkdir, utimes, open, rm } from 'fs/promises';
import { join, dirname } from 'path';
import { UserSettings } from 'n8n-core';
import { ErrorReporterProxy as ErrorReporter, LoggerProxy, sleep } from 'n8n-workflow';
export const touchFile = async (filePath: string): Promise<void> => {
await mkdir(dirname(filePath), { recursive: true });
const time = new Date();
try {
await utimes(filePath, time, time);
} catch {
const fd = await open(filePath, 'w');
await fd.close();
}
};
const journalFile = join(UserSettings.getUserN8nFolderPath(), 'crash.journal');
export const init = async () => {
if (existsSync(journalFile)) {
// Crash detected
ErrorReporter.warn('Last session crashed');
LoggerProxy.error('Last session crashed');
// add a 10 seconds pause to slow down crash-looping
await sleep(10_000);
}
await touchFile(journalFile);
};
export const cleanup = async () => {
await rm(journalFile, { force: true });
};

View file

@ -6,7 +6,7 @@ import { ErrorReporterProxy } from 'n8n-workflow';
let initialized = false; let initialized = false;
export const initErrorHandling = (app?: Application) => { export const initErrorHandling = () => {
if (initialized) return; if (initialized) return;
if (!config.getEnv('diagnostics.enabled')) { if (!config.getEnv('diagnostics.enabled')) {
@ -27,15 +27,15 @@ export const initErrorHandling = (app?: Application) => {
}, },
}); });
if (app) {
const { requestHandler, errorHandler } = Sentry.Handlers;
app.use(requestHandler());
app.use(errorHandler());
}
ErrorReporterProxy.init({ ErrorReporterProxy.init({
report: (error, options) => Sentry.captureException(error, options), report: (error, options) => Sentry.captureException(error, options),
}); });
initialized = true; initialized = true;
}; };
export const setupErrorMiddleware = (app: Application) => {
const { requestHandler, errorHandler } = Sentry.Handlers;
app.use(requestHandler());
app.use(errorHandler());
};

View file

@ -155,7 +155,7 @@ import glob from 'fast-glob';
import { ResponseError } from './ResponseHelper'; import { ResponseError } from './ResponseHelper';
import { toHttpNodeParameters } from './CurlConverterHelper'; import { toHttpNodeParameters } from './CurlConverterHelper';
import { initErrorHandling } from './ErrorReporting'; import { setupErrorMiddleware } from './ErrorReporting';
require('body-parser-xml')(bodyParser); require('body-parser-xml')(bodyParser);
@ -259,7 +259,7 @@ class App {
this.presetCredentialsLoaded = false; this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
initErrorHandling(this.app); setupErrorMiddleware(this.app);
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const telemetrySettings: ITelemetrySettings = { const telemetrySettings: ITelemetrySettings = {

View file

@ -33,7 +33,7 @@ import {
import config from '../config'; import config from '../config';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { WEBHOOK_METHODS } from './WebhookHelpers'; import { WEBHOOK_METHODS } from './WebhookHelpers';
import { initErrorHandling } from './ErrorReporting'; import { setupErrorMiddleware } from './ErrorReporting';
// eslint-disable-next-line @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-call // eslint-disable-next-line @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-call
require('body-parser-xml')(bodyParser); require('body-parser-xml')(bodyParser);
@ -219,7 +219,7 @@ class App {
this.presetCredentialsLoaded = false; this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
initErrorHandling(this.app); setupErrorMiddleware(this.app);
} }
/** /**

View file

@ -86,8 +86,8 @@ export class WorkflowRunnerProcess {
} }
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> { async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
process.on('SIGTERM', WorkflowRunnerProcess.stopProcess); process.once('SIGTERM', WorkflowRunnerProcess.stopProcess);
process.on('SIGINT', WorkflowRunnerProcess.stopProcess); process.once('SIGINT', WorkflowRunnerProcess.stopProcess);
// eslint-disable-next-line no-multi-assign // eslint-disable-next-line no-multi-assign
const logger = (this.logger = getLogger()); const logger = (this.logger = getLogger());

View file

@ -7,6 +7,7 @@ import {
jsonParse, jsonParse,
NodeApiError, NodeApiError,
NodeOperationError, NodeOperationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { DiscordAttachment, DiscordWebhook } from './Interfaces'; import { DiscordAttachment, DiscordWebhook } from './Interfaces';
@ -244,7 +245,7 @@ export class Discord implements INodeType {
// remaining requests 0 // remaining requests 0
// https://discord.com/developers/docs/topics/rate-limits // https://discord.com/developers/docs/topics/rate-limits
if (!+remainingRatelimit) { if (!+remainingRatelimit) {
await new Promise<void>((resolve) => setTimeout(resolve, resetAfter || 1000)); await sleep(resetAfter ?? 1000);
} }
break; break;
@ -255,7 +256,7 @@ export class Discord implements INodeType {
if (error.statusCode === 429) { if (error.statusCode === 429) {
const retryAfter = error.response?.headers['retry-after'] || 1000; const retryAfter = error.response?.headers['retry-after'] || 1000;
await new Promise<void>((resolve) => setTimeout(resolve, +retryAfter)); await sleep(+retryAfter);
continue; continue;
} }

View file

@ -10,6 +10,7 @@ import {
INodeTypeDescription, INodeTypeDescription,
NodeApiError, NodeApiError,
NodeOperationError, NodeOperationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { OptionsWithUri } from 'request'; import { OptionsWithUri } from 'request';
@ -667,7 +668,7 @@ export class HttpRequestV1 implements INodeType {
const batchSize: number = const batchSize: number =
(options.batchSize as number) > 0 ? (options.batchSize as number) : 1; (options.batchSize as number) > 0 ? (options.batchSize as number) : 1;
if (itemIndex % batchSize === 0) { if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number)); await sleep(options.batchInterval as number);
} }
} }

View file

@ -8,6 +8,7 @@ import {
INodeTypeDescription, INodeTypeDescription,
NodeApiError, NodeApiError,
NodeOperationError, NodeOperationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { OptionsWithUri } from 'request'; import { OptionsWithUri } from 'request';
@ -701,7 +702,7 @@ export class HttpRequestV2 implements INodeType {
const batchSize: number = const batchSize: number =
(options.batchSize as number) > 0 ? (options.batchSize as number) : 1; (options.batchSize as number) > 0 ? (options.batchSize as number) : 1;
if (itemIndex % batchSize === 0) { if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number)); await sleep(options.batchInterval as number);
} }
} }

View file

@ -9,6 +9,7 @@ import {
jsonParse, jsonParse,
NodeApiError, NodeApiError,
NodeOperationError, NodeOperationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { OptionsWithUri } from 'request-promise-native'; import { OptionsWithUri } from 'request-promise-native';
@ -1002,7 +1003,7 @@ export class HttpRequestV3 implements INodeType {
if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) { if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) {
if (itemIndex % batchSize === 0) { if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, batchInterval)); await sleep(batchInterval);
} }
} }

View file

@ -1,4 +1,4 @@
import { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow'; import { IDataObject, IExecuteFunctions, ITriggerFunctions, sleep } from 'n8n-workflow';
import * as amqplib from 'amqplib'; import * as amqplib from 'amqplib';
@ -138,9 +138,7 @@ export class MessageTracker {
// when for example a new version of the workflow got saved. That would lead to // when for example a new version of the workflow got saved. That would lead to
// them getting delivered and processed again. // them getting delivered and processed again.
while (unansweredMessages !== 0 && count++ <= 300) { while (unansweredMessages !== 0 && count++ <= 300) {
await new Promise((resolve) => { await sleep(1000);
setTimeout(resolve, 1000);
});
unansweredMessages = this.unansweredMessages(); unansweredMessages = this.unansweredMessages();
} }

View file

@ -13,6 +13,7 @@ import {
INodeExecutionData, INodeExecutionData,
NodeApiError, NodeApiError,
NodeOperationError, NodeOperationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
export async function twitterApiRequest( export async function twitterApiRequest(
@ -193,12 +194,7 @@ export async function uploadAttachments(
// data has not been uploaded yet, so wait for it to be ready // data has not been uploaded yet, so wait for it to be ready
if (response.processing_info) { if (response.processing_info) {
const { check_after_secs } = response.processing_info as IDataObject; const { check_after_secs } = response.processing_info as IDataObject;
await new Promise((resolve, _reject) => { await sleep((check_after_secs as number) * 1000);
setTimeout(() => {
// @ts-ignore
resolve();
}, (check_after_secs as number) * 1000);
});
} }
media.push(response); media.push(response);

View file

@ -18,7 +18,7 @@ export * from './WorkflowErrors';
export * from './WorkflowHooks'; export * from './WorkflowHooks';
export * from './VersionedNodeType'; export * from './VersionedNodeType';
export { LoggerProxy, NodeHelpers, ObservableObject, TelemetryHelpers }; export { LoggerProxy, NodeHelpers, ObservableObject, TelemetryHelpers };
export { deepCopy, jsonParse } from './utils'; export { deepCopy, jsonParse, sleep } from './utils';
export { export {
isINodeProperties, isINodeProperties,
isINodePropertyOptions, isINodePropertyOptions,

View file

@ -64,3 +64,8 @@ export const jsonParse = <T>(jsonString: string, options?: JSONParseOptions<T>):
throw error; throw error;
} }
}; };
export const sleep = async (ms: number): Promise<void> =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});