Custom streamlar yozamiz(Nodejs).

Habibov Ulug'bek
7 min readAug 11, 2024

--

Assalamu Alaykum bugun Nodejsda custom streamlar yozishni ko`rib o`tamiz. Bu maqolani o`qishni boshlashdan oldin oldingi 2 maqolamini o`qishni tavsiya qilaman (streamlar haqida ) .

  1. Stream nima? Nodejsda streamlar bilan ishlash.
  2. Streamni benchmark qilamiz (nodejs).

Birinchi bo`lib custom stream yozishning yaxshi taraflarini ko`rib chiqsak Nodejs bizga shunchaki streamlarni ishlatishni emas ularni o`zimiz xohlagandek foydalanish uchun o`zgartirishni(hamma narsani emas) ham taklif etadi .Buning yordamida biz o`zimizga kerakli bo`lgan aynan bizni muammoni yechadigan streamlarni yarata olamiz.

Custom Writable Stream

Biz hozir custom FileWritableStream yozamiz. Bu streamning asosiy maqsadi biz oldin fs orqali faylni ochib stream yaratmasdan birdaniga fayl nomini berish orqali streamni yo`lga qo`yishdan iborat .Unda qani boshladik :) .

Unda custom stream yozish uchun birinchi bo`lib node:stream modulidan Writable streamni import qilib olamiz va yaratmoqchi bo`lgan streamda undan meros olamiz.

const { Writable } = require("node:stream");

class FileWritableStream extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}

Bu yerda options berilganda juda etiborli bo`lishimiz zarur chunki stream optionlarda autoDestroy va shu kabi boshqa tanlovlar bor va buni shu klass foydalanuvchisi tomonidan qayta yozilishi stream o`zini o`chirishi yoki qaysidir eventlarni ishga tushurishini oldi olinishi mumkin. Writable streamdan meros olgandan so`ng biz undagi write va destroy metodlarini o`zimizga moshlab qayta yozishimiz zarur bu custom stream yozishdagi (nodejsda asosiy qoida) .

Endi Custom streamni o`zimiz xohlayotgan holatga keltiramiz (kerakli propertylarni yaratamiz ) :

class FileWritableStream extends Writable {
constructor({ highWaterMark, fileName }) {
super({ highWaterMark });

this.fileName = fileName;
this.fileDescriptor = null;
this.chunks = [];
this.chunksSize = 0;
this.writesCount = 0;
}
}

bu yerda biz fayl nomini klass foydalanuvchisi tomonidan olinishini yo`lga qo`ydik endi _construct metodi orqali fileni ochib olib va uni o`zgaruvchilarga biriktiramiz. _construct metodi constructordan keyin avtomat ishga tushadi va barcha boshqa metodlarni pendingda ushlab turadi . Qachonki callbackni chaqirganimzdan so`ng u boshqa metodlar bilan ishlashni boshlaydi (callback Nodejs tomonidan taqdim qilingan) .
Agarda callback biror bir argumentsiz chaqirilsa bu biz ishlashda davom etishimiz mumkin hammasi yaxshi degani agarda u argument bilan chaqirilsa bu error bo`ladi va bu orqali hamma narsa to`xtatiladi.

class FileWritableStream extends Writable {
constructor({ highWaterMark, fileName }) {
super({ highWaterMark });

this.fileName = fileName;
this.fileDescriptor = null;
this.chunks = [];
this.chunksSize = 0;
this.writesCount = 0;
}

//it method runs after constructor and before starting write
_construct(callback) {
fs.open(this.fileName, "w", (err, fd) => {
if (err) {
callback(err); // exits stream and gives a error
} else {
this.fileDescriptor = fd;
//no arguments means it was succesful
callback(); // until I call this callback other metods waits me
}
});
}
}

Xo`sh endi _write metodini yozamiz.

_write(chunk, encoding, callback) {
this.chunks.push(chunk);
this.chunksSize += chunk.length;

if (this.chunksSize >= this.writableHighWaterMark) {
fs.write(this.fileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
++this.writesCount;
callback(); // it emits drain event
});
} else {
callback();
}
}

Bu yerda ko`rib turganimizdek callback funksiya ishlatilmoqda bu agar chunk hajmi maksimal berilgan qiymatdan o`tib ketgan holatda filega ma’lumotlar yozilib callback funksiyasi chaqiriladi va u drain eventni ishga tushuradi ya’ni xotira bo`sh yozishda davom etish mumkin deganday.

Va bizda keyingi metod bu _final bu metod bizga buffer to`lmay qolgan ammo oxirgi chunklar yig`ilgan ma’lumotni filega yozishga yordam beradi.

// it only called when end function called
_final(callback) {
fs.write(this.fileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
++this.writesCount;
callback(); //it will emit finish event and then executes destroy method
});
}

Va callback metodi chaqirilgandan so`ng u finish event ishga tushuradi va destroy metodini ishga tushadi .

Endi destroy metodini yozamiz:

destroy(error, callback) {
console.log("Number of writes : ", this.writesCount);
if (this.fileDescriptor) {
fs.close(this.fileDescriptor, (err) => {
//if error happens never throw a error you should pass this to callback all the time
callback(err || error);
});
this.fileDescriptor = null;
} else {
callback(error);
}
}

Bu metod orqali ochilgan fayl descriptor orqali yopiladi .

Errorlar hech qachon throw qilinmaydi faqatgina callback ichiga berib yuborilsa kifoya.

Endi umumiy FileWritableStreamni ko`ramiz:

const { Writable } = require("node:stream");
const fs = require("node:fs");

class FileWritableStream extends Writable {
constructor({ highWaterMark, fileName }) {
super({ highWaterMark });

this.fileName = fileName;
this.fileDescriptor = null;
this.chunks = [];
this.chunksSize = 0;
this.writesCount = 0;
}

//it method runs after constructor and before starting write
_construct(callback) {
fs.open(this.fileName, "w", (err, fd) => {
if (err) {
callback(err); // exits stream and gives a error
} else {
this.fileDescriptor = fd;
//no arguments means it was succesful
callback(); // until I call this callback other metods waits me
}
});
}

_write(chunk, encoding, callback) {
this.chunks.push(chunk);
this.chunksSize += chunk.length;

if (this.chunksSize >= this.writableHighWaterMark) {
fs.write(this.fileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
++this.writesCount;
callback(); // it emits drain event
});
} else {
callback();
}
}

// it only called when end function called
_final(callback) {
fs.write(this.fileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
++this.writesCount;
callback(); //it will emit finish event and then executes destroy method
});
}

_destroy(error, callback) {
console.log("Number of writes : ", this.writesCount);
if (this.fileDescriptor) {
fs.close(this.fileDescriptor, (err) => {
//if error happens never throw a error you should pass this to callback all the time
callback(err || error);
});
this.fileDescriptor = null;
} else {
callback(error);
}
}
}

Endi bu yozilgan streamni ishlatib ko`ramiz:

const stream = new FileWritableStream({
highWaterMark: 1800,
fileName: "output.txt",
});
stream.write(Buffer.from("hello"));
stream.end(Buffer.from("Good bye"));

stream.on("finish", () => {
console.log("finished");
});
custom stream ishlatish natijasi

Custom Readable Stream

Endi Writable streamda qilganimizdek FileReadableStream yozamiz .

Bu Writable streamdan faqatgina _write emas _read metodi orqali farq qiladi .

const { Readable } = require("node:stream");
const fs = require("node:fs");

class FileReadStream extends Readable {
constructor({ highWaterMark, fileName }) {
super({ highWaterMark });
this.fileName = fileName;
this.fileDescriptor = null;
}

_construct(callback) {
fs.open(this.fileName, "r", (err, fd) => {
if (err) {
return callback(err);
}
this.fileDescriptor = fd;
callback();
});
}
// there is no callbakc because it knows state from pushing
_read(size) {
const buff = Buffer.alloc(size);
fs.read(this.fileDescriptor, buff, 0, size, null, (err, bytesRead) => {
if (err) return this.destroy(err);
this.push(bytesRead > 0 ? buff.subarray(0, bytesRead) : null); // => why we subarraying cause maybe in buff <23 23 43 54 0 0 0 > bytesRead will be 4 here and we can remove 0s while pushing internal buffer
});

// this.push(null); //if you push null to read stream it means we are done reading
}

_destroy(error, callback) {
if (this.fileDescriptor) {
fs.close(this.fileDescriptor, (err) => callback(err || error));
} else {
callback(error);
}
}
}

Bu yerda read metodida callback funksiya yoq chunki unda qaysi holatdaligini push orqali bilib olinadi. Agarda pushda biror malumot uzatilsa demak u o`qiyapti agarda null uzatilsa bu degani stream o`qishni tugatdi bizda boshqa ma’lumot qolmadi degani.

Bu streamdan foydalanish :


const stream = new FileReadStream({ fileName: "input.txt" });

stream.on("data", (chunk) => {
console.log(chunk.toString());
});

stream.on("end", () => {
console.log("Stream is done reading the file");
});
custom readable stream natijasi

Custom Duplex Stream

Biz Duplex streamda xuddi tepada qilgan write and read streamagi holatni birlashtiramiz chunki bu ikkalasi ham Duplex streamda mavjud.

const { Duplex } = require("node:stream");
const fs = require("node:fs");

class CustomDuplexStream extends Duplex {
constructor({
writableHighWaterMark,
readableHighWaterMark,
readFileName,
writeFileName,
}) {
super({ readableHighWaterMark, writableHighWaterMark });
this.readFileName = readFileName;
this.writeFileName = writeFileName;
this.readFileDescriptor = null;
this.writeFileDescriptor = null;
this.chunks = [];
this.chunksSize = 0;
}

_construct(callback) {
fs.open(this.readFileName, "r", (err, readFd) => {
if (err) return callback(err);
this.readFileDescriptor = readFd;
fs.open(this.writeFileName, "w", (err, writeFd) => {
if (err) return callback(err);
this.writeFileDescriptor = writeFd;
callback();
});
});
}

_write(chunk, encoding, callback) {
this.chunks.push(chunk);
this.chunksSize += chunk.length;

if (this.chunksSize > this.writableHighWaterMark) {
fs.write(this.writeFileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
callback();
});
} else {
// when we're done, we should call the callback function
callback();
}
}

_read(size) {
const buff = Buffer.alloc(size);
fs.read(this.readFileDescriptor, buff, 0, size, null, (err, bytesRead) => {
if (err) return this.destroy(err);
// null is to indicate the end of the stream
this.push(bytesRead > 0 ? buff.subarray(0, bytesRead) : null);
});
}

_final(callback) {
fs.write(this.writeFileDescriptor, Buffer.concat(this.chunks), (err) => {
if (err) return callback(err);
this.chunks = [];
callback();
});
}

_destroy(error, callback) {
callback(error);
}
}

Endi Custom Duplex streamdan foydalanib ko`ramiz:

const duplex = new CustomDuplexStream({
readFileName: "read.txt",
writeFileName: "write.txt",
});

duplex.write(Buffer.from("this is a string 0"));
duplex.write(Buffer.from("this is a string 1\n"));
duplex.write(Buffer.from("this is a string 2"));
duplex.write(Buffer.from("this is a string 3"));
duplex.end(Buffer.from("end of write"));

duplex.on("data", (chunk) => {
console.log(chunk.toString("utf-8"));
});
custom duplex stream natijasi

Xulosa

Ko`rib turganingizdek streamning ichki funksiyalarini o`zgartirish orqali biz xohlagan holatga olib kelishimiz mumkin.

Agarda maqola yoqqan bo`lsa chapak chaling (ko`p chalsayam bo`ladi 50 tagacha).

Yana shunday maqolalar o`qishni xohlasangiz medium da follow tugmasini bosib qo`ying.

Xato va kamchiliklar uchun uzr !!!

Kodlarni ushbu githab repoda topishingiz mumkin =>Repo

linkedin.com => Ulug’bek Habibov | LinkedIn

telegram channel => @habibov_ulugbek

--

--

Habibov Ulug'bek
Habibov Ulug'bek

Written by Habibov Ulug'bek

Software Engineer | Backend Nodejs Developer

No responses yet