Поток — это концепция, которая была сначала реализована в UNIX системах для передачи данных из одной программы в другую в операциях ввода/вывода. Это позволяет каждой программе быть очень специализированной в том, что она делает — быть независимым модулем. Сочетание таких простых программ помогает в создании более сложных систем путем «объединения» их в цепочку вызовов.
Потоки позволяют обмениваться данными небольшими частями, что в свою очередь дает возможность в своей работе не расходовать много памяти. Конечно, это зависит от того, как вы реализуется внутренний функционал потока.
Распространенная задача — парсинг файла большого объема. Например, в текстовом файле с данными логов нужно найти строку, содержащую определенный текст. Вместо того, чтобы файл полностью загрузить в память, и потом начать разбирать в нем строки в поисках нужной, мы можем его считывать небольшими порциями. Тем самым не занимаем память сверх необходимого, а лишь столько памяти, сколько нужно для буферизации считанных данных. Как только найдем требуемую запись, сразу прекратим дальнейшую работу. Или можем передать найденную запись в другой поток по цепочке, например, для преобразование в другой формат, или сохранения в другой файл.
Модуль stream предоставляет базовый API по работе с потоками в Node.JS. Документации Node.JS вполне достаточно, чтобы разобраться в данном вопросе, но мы попытаемся составить что-то вроде шпаргалки с пояснениями некоторых моментов.
Readable.pipe(Writable);//например, по "схеме" DataBase -> File
Readable.pipe(Transform).pipe(Writable);//DataBase -> преобразовать в JSON формат -> сохранить JSON в File
Duplex.pipe(Transform).pipe(Duplex);//прочитать из DataBase -> обработать -> записать обратно в DataBase результат
new StreamObject({objectMode: false, highWaterMark: кол_во_байт}); //по умолчанию 16384 (16kb)
new StreamObject({objectMode: true, highWaterMark: кол_во_объектов});//по умолчанию 16
'use strict';
const { Readable } = require('stream');
/**
* чтобы реализовать свой класс Readable потока, необходимо имплементировать метод _read().
* именно с нижним подчеркиванием перед именем
* сравните состояние потока (_readableState) во время инициализации, и по окончании чтения данных (on('end', ()=>{}))
*/
class Source extends Readable
{
constructor(array_of_data = [], opt = {})
{
super(opt);
this._array_of_data = array_of_data;
console.log('objectMode ', this._readableState.objectMode);//false по умолчанию, если не задано явно другое
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('length ', this._readableState.length);//0 - кол-во буфер объектов
console.log('flowing ', this._readableState.flowing);//null
//для краткости примеров, добавим обработчики событий в конструкторе
this.on('data', (chunk)=>
{
//при обработке события 'data' - данные считываются из буфера и удаляются из него
console.log('\n---');
console.log('Readable on data ');
//здесь chunk данные в виде буфера
console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`);
//кол-во данных в текущем буфере (кол-во буфер объектов)
console.log('buffer.length ', this._readableState.buffer.length);
console.log('данные: ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk как строка ', this._readableState.buffer.toString());
})
.on('error',(err)=>
{
console.log('Readable on error ', err);
})
.on('end',()=>
{
console.log('Readable on end ');
console.log('objectMode ', this._readableState.objectMode);//false
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('buffer.length ', this._readableState.buffer.length);//0
console.log('flowing ', this._readableState.flowing);//true !!!так как у нас есть обработчик события 'data'
})
.on('close',()=>
{
console.log('Readable on close не все реализации генерируют это событие');
});
}
_read()
{
let data = this._array_of_data.shift()
if (!data) {
//сообщаем, что данные закончились
this.push(null);
} else {
this.push(data);
}
}
}
/*значение именно как строки, т.к. по умолчанию потоки работают либо со строками, либо с буфером. иначе будет выброшено сообщение об ошибке по время this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */
let array_of_data = ['1', '2', '3', '4', '5'];
let opts = {/* значения свойств по умолчанию */};
const R = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false,
highWaterMark: 1//1 байт лимит для буферизации данных _readableState.buffer.length будет === 1
};
const R2 = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false
, encoding: 'utf8'//если задать кодировку (поддерживаемую NodeJS), то поток будет работать с данными как со строками, а не как с буфером
};
const R3 = new Source(array_of_data, opts);//кодировку так же можно задать с помощью метода .setEncoding('utf8')
array_of_data = [1, 2, 3, 4, 5];
/*при таких "настройках" потока будет ошибка. если objectMode: true то не надо указывать кодировку - ни в параметрах, ни через метод Readable.setEncoding('utf8')*/
opts = {
objectMode: true
, encoding: 'utf8'
};
const R4 = new Source(array_of_data, opts);
//при objectMode: true можно передать как строки, или как числа (Number)
array_of_data = [1, 2, 3, 4, 5];
opts = {
objectMode: true
};
const R5 = new Source(array_of_data, opts); //highWaterMark 16 - значение по умолчанию для объектов
/*имитируем задержку при чтении данных (подобное может происходить при Writable.write(someData) === false). пример ниже взят из документации Node.JS.
выполните код, и увидите как данные прекращаются считываться, они накапливаются в буфере, а потом продолжают считываться*/
array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
opts = {
objectMode: true
};
const R6 = new Source(array_of_data, opts);
R6.on('data', (chunk) => {
//приостанавливаем передачу данных на 1 секунду
R6.pause();
setTimeout(() => {
R6.resume();//возобновим работу потока
}, 1000);
});
'use strict';
const Source = require('./readable.js');
const { Writable } = require('stream');
class Writer extends Writable
{
constructor(opt = {})
{
super(opt);
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('drain', ()=>
{
console.log('\n------ writable on drain');
})
.on('error', (err)=>
{
console.log('\n------ writable on error', err);
})
.on('finish', ()=>
{
console.log('\n------ writable on finish');
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
});
}
/**
* @param chunk - строка|буфер|объект
* @param encoding - кодировка поступающих данных. если objectMode === true, значение encoding будет игнорироваться
* @param done - callback ф-ция. ее удобнее именовать именно так, потому что вы ее вызываете, когда по логике
* вашего метода _write, нужно сообщить, что завершили запись текущей части данных chunk, и готовы принять на запись
* следующую часть: done(err) - можно передать объект ошибки new Error(...)
* @private
*/
_write(chunk, encoding, done)
{
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
console.log(typeof chunk );
//для пример с потоком Transform см ниже
if (typeof chunk === 'object') {
console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get()));
} else {
console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`);
}
/* Пример с ошибкой оставим закомментированным.
Добавим, что:
1) всегда добавляйте обработчик ошибок on('error', (err)=>{...})
2) если выбрасывается ошибка, то поток данных Readable не прекращает свою работу.
в этом слувае вам надо обрабатывать эту ситуацию - например, вызывать Readable.emit('error', err);
и прекращать читать данные Readable.puse(), после обработки ошибки продолжить работу Readable.remuse().
Это в общем случае, и все зависит от ваших задач при работе с потоками
//if (chunk > 3) return done(new Error('chunk > 3'));*/
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {/* значения по умолчанию */};
const R = new Source(array_of_data, r_opts);
let w_opts = {/* значения по умолчанию */};
const W = new Writer(w_opts);
R.pipe(W);
array_of_data = ['1', '2', '3', '4', '5'];
r_opts = {encoding: 'utf8'};
const R1 = new Source(array_of_data, r_opts);
w_opts = {
decodeStrings: false//данные в _write будут строками в кодировке 'utf8', так как данные из источника - строки ( см r_opts),
};
const W1 = new Writer(w_opts);
R1.pipe(W1);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R2 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
};
const W2 = new Writer(w_opts);
R2.pipe(W2);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R3 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
, highWaterMark: 1 //ограничем буфер; при таком маленьком значении каждый раз будет вызываться событие 'drain'
};
const W3 = new Writer(w_opts);
R3.pipe(W3);
//Вариант без pipe()
const R3_1 = new Source(array_of_data, r_opts);
const W3_1 = new Writer(w_opts);
R3_1.on('data', (chunk)=> {
//R3_1._readableState.flowing === true
console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer);
toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain);
});
function onDrain() {
//R3_1._readableState.flowing === false, так как был вызван метод R3_1.pause() см toWriteOrNotToWriteThatIsTheQuestion
console.log('R3_1 in flowing mode', R3_1._readableState.flowing);
R3_1.resume();
}
/**
* если на данный момент не можем больше писать в поток Writable, нужно оставноить и получение данных из Readable (R3_1.pause())
* как только буфер очистится (событие 'drain'), мы продолжаем читать данные из источника Readable (см cb R3_1.resume(); ), и записывать в Writable
* @param data
* @param cb
*/
function toWriteOrNotToWriteThatIsTheQuestion(data, cb)
{
//во "внешнем коде" записывать данные через метод write(...), а не через _write(...)
if (!W3_1.write(data)) {
R3_1.pause();
W3_1.once('drain', cb);
} else {
process.nextTick(cb);
}
}
'use strict';
const Readable = require('./readable.js');
const Writable = require('./writable.js');
const {Transform} = require('stream');
/*для примера того, что можем передавать не только строки, буфер, простые JS объекты,
но и экземпляры классов*/
class Chunk
{
constructor(chunk)
{
this.set(chunk);
}
set(chunk)
{
this._chunk = chunk;
}
get()
{
return this._chunk;
}
inPow(pow = 2)
{
return Math.pow(this.get(), pow);
}
}
class Transformer extends Transform
{
constructor(opt = {})
{
super(opt);
console.log('\n -------- Transform in constructor');
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('close', ()=>
{
console.log('\n------ Transform on close');
})
.on('drain', ()=>
{
console.log('\n------ Transform on drain');
})
.on('error', (err)=>
{
console.log('\n------ Transform on error', err);
})
.on('finish', ()=>
{
console.log('\n------ Transform on finish');
})
.on('end', ()=>
{
console.log('\n------ Transform on end');
})
.on('pipe', ()=>
{
console.log('\n------ Transform on pipe');
});
}
/**
* метод, реализующий в себе запись данных (chunk поступают в поток Transform),
* и чтение данных - когда другой поток читает из Transform
* @param chunk
* @param encoding
* @param done - в общем случае done(err, chunk)
* @private
*/
_transform(chunk, encoding, done)
{
/*завершить обработку текущих данных chunk, и передать дальше на чтение можно двумя вариантами
done(null, chunk);
done(err, chunk); - в этом случае будет вызвано событие error
или так, что то же самое:
this.push(chunk);
done();
this.push(chunk);
done(err);*/
//преобразовали выходные данные в экземпляр класса Chunk (см. пример writable.js)
this.push(new Chunk(chunk));
done();
}
/**
* Кастомные transform потоки могут реализовать метод _flush.
Он будет вызван, когда нет больше данных на запись, но перед событием 'end' потока Readable (имеется ввиду Transform, так как это поток и на запись, и на чтение данных).
* @param done - done(err) можно передать объект ошибки Error
* @private
*/
_flush(done)
{
//TODO ... что-нибудь сделали дополнительно перед завершением работы потока
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {
encoding: 'utf8'
};
const R = new Readable(array_of_data, r_opts);
let t_opts = {
readableObjectMode: true //читать из потока Transform будут объекты
, writableObjectMode: false//записывать в поток Transform можно либо строки или буфер
, decodeStrings: false
};
const T = new Transformer(t_opts);
let w_opts = {
objectMode: true//если false, будет выброшена ошибка
};
const W = new Writable(w_opts);
R.pipe(T).pipe(W);
К сожалению, не доступен сервер mySQL