Знаешь, что общего между разработчиком, который грузит 10 гигабайт JSON в память одним fs.readFile(), и человеком, который пытается носить воду решетом? Оба получат проблемы, которые потом будут долго чинить. Но есть способ проще — ReadableStream и правильная архитектура. В этой статье разберемся, почему большинство middleware в Express — это костыль для обработки данных, и как его убрать.
Основная идея простая: не держи всё в памяти сразу. Вместо этого обрабатывай данные порциями, как их получаешь. Звучит скучно? На практике это означает, что твой сервер сможет обрабатывать файлы, которые больше доступной RAM, без свопа и паузы. Давай посмотрим, как это работает и почему это гораздо лучше, чем все эти bodyParser и multer.
Почему middleware раздували как шарик
В Express всё построено на предположении, что данные в запросе — это что-то маленькое. Приходит JSON из фронтенда на 5 килобайт, middleware кладет его целиком в объект req.body, и ты обрабатываешь. Красиво и просто для «нормальных» случаев. Но когда нужно залить в API 10 гигабайт CSV-логов или обработать большой архив JSON-записей, эти middleware начинают жрать память как сумасшедшие.
Проблема не в самом Express — она в паттерне «сначала всё прочитаем, потом обработаем». Серверу нужно:
- загрузить весь payload в буфер (уходит 10+ гигабайт RAM)
- распарсить его полностью (ещё память)
- только потом начать что-то делать
Итоговый результат: сервер либо падает с OutOfMemory, либо становится черепахой из-за свопа на диске. И вот разработчик добавляет middleware для сжатия, middleware для ограничения размера, middleware для деления на чанки… и получается костыльная башня вместо архитектуры.
Правильный подход: обрабатывай данные во время их поступления, не накапливая в памяти. Node.js Streams — это именно для этого.
Как работает ReadableStream и почему это магия
ReadableStream в Node.js — это не просто интерфейс для чтения файлов. Это контролируемое потокование данных с встроенной системой управления буфером и обратного давления (backpressure). Когда говорят о streams, часто описывают их как абстракцию — скучно и непонятно. Давай по факту.
Потоку нужно помнить, что данные поступают быстро, а потребитель может быть медленнее. Вот тут включается highWaterMark — это лимит на буфер данных. По умолчанию это 16 килобайт для byte mode. Когда буфер переполняется, stream перестает читать из источника и ждет, пока потребитель обработает уже накопленные данные. Это называется backpressure — механизм, который защищает твой сервер от утечек памяти.
ReadableStream начинается в paused mode — данные накапливаются в буфере, но ничего не происходит. Как только ты прикрепишь listener data или вызовешь .resume(), stream переходит в flowing mode и начинает отправлять чанки. Это разделение режимов позволяет разработчику выбрать, как контролировать процесс.
| Режим |
Поведение |
Когда использовать |
| Paused |
Данные накапливаются в буфере, ты вызываешь .read() |
Когда нужен полный контроль над темпом |
| Flowing |
Stream сам отправляет чанки через callbacks |
Когда нужно просто пробросить данные дальше |
Вот пример, который обрабатывает 10 гигабайт JSON-массива без загрузки в память:
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { parser } from 'stream-json';
import { streamArray } from 'stream-json/streamers/StreamArray.js';
async function processLargeJSON(filePath, processFn) {
const records = [];
await pipeline(
createReadStream(filePath),
parser(),
streamArray(),
async function* (source) {
for await (const { value } of source) {
await processFn(value);
yield value;
}
}
);
}
// Используем
await processLargeJSON('dataset.json', async (record) => {
await db.insert(record); // Каждая запись обработана и очищена
});
Видишь разницу? Не копируем в req.body, не ждем парсинга всего файла. Каждая запись обрабатывается отдельно и тут же забывается. Память не растет — она висит на одном уровне, потому что GC успевает очищать обработанные объекты.
Как это выглядит без middleware
Теперь вот момент истины — как это внедрить в Express, чтобы не использовать стандартные middleware. Главное правило: потокование начинается с самого роута, не в middleware.
Традиционный путь выглядит так:
app.post('/upload', express.json({ limit: '50mb' }), (req, res) => {
// Здесь req.body уже лежит в памяти целиком
const data = req.body;
// ...
});
А правильный путь — это взять req как ReadableStream и работать с ним напрямую:
app.post('/upload', async (req, res) => {
try {
await pipeline(
req, // req это ReadableStream
parser(),
streamArray(),
new Writable({
objectMode: true,
async write(record, encoding, callback) {
await db.insert(record);
callback();
}
})
);
res.json({ success: true });
} catch (err) {
res.status(400).json({ error: err.message });
}
});
Какая разница? Огромная. Middleware express.json() парсит весь body и кладет в req.body — это значит, что вся полезная нагрузка лежит в памяти. Потоковый подход парсит по кусочкам и сразу же обрабатывает. Первый вариант упадет при 10GB, второй обработает это за несколько минут, не потея.
Ключевые преимущества потокового подхода:
- Память не растет, остается примерно на одном уровне (буфер + внутренние структуры парсера)
- Можно обрабатывать файлы больше доступной RAM
- Нет паузы ожидания парсинга — обработка идет параллельно чтению
- Легче реализовать отмену операции (abort stream)
- Исходно это то, на что рассчитан Node.js
CSV и другие форматы: та же история
IDea работает не только для JSON. CSV, протобуфы, нджейсон (newline-delimited JSON) — для всего есть потоковые парсеры. Экосистема npm богата:
csv-parse для CSV — парсит строка за строкой
ndjson для NDJSON — каждую строку отдельно
stream-json для JSON (уже видели)
protobufjs с потоковым режимом для бинарных данных
Принцип везде один: читаешь чанк, парсишь его кусок, обрабатываешь, забываешь. RAM остается чистой.
Сравним расход памяти на обработке одного гигабайта:
| Подход |
Пик памяти |
Стабильность |
Время обработки |
fs.readFile() + парсинг |
~1.2GB |
Падает при >RAM |
30+ сек (+ GC паузы) |
| Express middleware |
~1.5GB |
Частые фризы |
40+ сек |
| Потокование |
~80MB |
Стабильно |
10-15 сек |
Цифры примерные, но порядок ясен. Потоковый подход побеждает везде.
Когда потоки — это overkill
Шу, важный момент, чтобы не потом обвинять меня. Потоки — отличный инструмент, но не универсальный. Если ты обрабатываешь маленький JSON на 5 килобайт, потоки добавят только overhead. fs.readFile() здесь честно быстрее. Потоки имеют смысл, когда:
- Данные больше нескольких мегабайт
- Нужна константная, предсказуемая память
- Нельзя ждать, пока всё прочитается перед обработкой
- Обработка асинхронная (запись в БД, вызовы API)
Если это не про тебя — не усложняй, юзай простое чтение.
Практические подводные камни
Когда начинаешь писать код с потоками, есть детали, на которые легко наступить.
Backpressure — это не просто словечко. Если ты создаешь Writable stream и обработка медленная (например, INSERT в БД), нужно обязательно учитывать сигналы backpressure. Если ты читаешь быстрее, чем пишешь, буфер переполнится, и данные потеряются.
const writable = fs.createWriteStream('output.txt');
const readable = fs.createReadStream('input.txt');
// НЕПРАВИЛЬНО
readable.on('data', (chunk) => {
writable.write(chunk); // Игнорируем возвращаемое значение
});
// ПРАВИЛЬНО
readable.pipe(writable); // pipe автоматически управляет backpressure
// ИЛИ
readable.on('data', (chunk) => {
if (!writable.write(chunk)) {
readable.pause(); // Если буфер переполнен, паузируем чтение
}
});
writable.on('drain', () => {
readable.resume(); // Когда буфер освободился, возобновляем
});
Это не магия, это базовая механика потоков, но многие забывают.
Error handling — потоки могут сломаться на любом этапе. Если не ловить ошибки, процесс упадет. pipeline() из stream/promises помогает, потому что отлавливает ошибки во всех стадиях.
Чанк-ориентированность — иногда парсер может разбить логический элемент на несколько чанков. Если ты пишешь собственный Transform stream, нужно помнить, что входные данные могут приходить кусочками, а не целыми объектами.
Архитектура без middleware фальш-сложности
Когда убираешь middleware типа bodyParser и multer, роуты становятся проще, но нужна структура. Вот как можно организовать:
// Utility для парсинга JSON потока
async function parseJSONStream(readableStream) {
const items = [];
await pipeline(
readableStream,
parser(),
streamArray(),
new Writable({
objectMode: true,
write(item, encoding, callback) {
items.push(item);
callback();
}
})
);
return items;
}
// Роут
app.post('/import', async (req, res, next) => {
try {
const data = await parseJSONStream(req);
// Обработка
res.json({ imported: data.length });
} catch (err) {
next(err);
}
});
Вместо того чтобы навешивать middleware, создаешь utility-функции для специфических задач. Это чище, понятнее и проще тестировать. Каждая функция отвечает за одно — парсинг JSON, CSV, валидацию и так далее.
Что получаешь:
- Нет загадочного middleware, который где-то трансформирует данные
- Полный контроль над обработкой в каждом роуте
- Легко добавлять специфическую логику без введения нового middleware
- Проще дебажить (ошибки приходят именно туда, где их создали)
- Меньше dependencies в package.json
Это не значит, что все middleware плохие. Но middleware для парсинга тела запроса — это именно тот случай, когда потоковый подход работает лучше и дает больше контроля.
От теории к действию: что осталось за кадром
Мы разобрали механику, но есть детали, которые влияют на реальную производительность. Например, размер highWaterMark. По умолчанию это 16 килобайт, но ты можешь его менять. Для больших файлов имеет смысл увеличить до 64 или 256 килобайт — это снизит количество context switches в event loop. Но это не серебряная пуля; нужно профилировать на своих данных.
Также не забывай про compression. Если клиент отправляет gzip-сжатые данные, нужно распаковать перед парсингом. zlib в Node.js справляется отлично и тоже работает через потоки. Комбинация createReadStream -> zlib.createGunzip() -> парсер потребит минимум памяти даже для очень больших файлов.