NATS JetStream Worker في TypeScript: Retries و
٢ يونيو ٢٠٢٦
يحتاج عامل NATS JetStream المستدام في TypeScript إلى ثلاثة أشياء نادراً ما توضحها الوثائق معاً: مستهلك سحب مستدام (durable pull consumer) مع تأكيد استلام صريح، و سياسة إعادة محاولة تعتمد على max_deliver، و ack_wait، وتأخير nak، و طابور رسائل مهملة (dead-letter queue) تبنيه بنفسك لأن JetStream لا يوفر واحداً بشكل افتراضي. يربط هذا الدليل هذه العناصر الثلاثة على Node 24.
ملخص
يبني هذا الدليل العملي عامل أحداث (event worker) جاهز للإنتاج على NATS JetStream باستخدام عميل @nats-io/jetstream 3.4 الموديولار الحالي1 — وليس حزمة nats v2 الموحدة والقديمة التي لا تزال معظم الشروحات تستخدمها. ستقوم بتشغيل خادم JetStream في Docker، وإنشاء مشروع TypeScript يقوم بتشغيل ملفات .ts مباشرة على Node 24 بدون خطوة بناء، وإنشاء تيار (stream) ومستهلك سحب مستدام، ونشر الأحداث بشكل متكرر (idempotently) باستخدام msgID، واستهلاكها مع تأكيد صريح عبر ack/nak/term، وإعادة محاولة الفشل مع تأخير متزايد (backoff delay)، وتوجيه الرسائل السامة (poison messages) إلى طابور الرسائل المهملة. يستخدم الدليل Node.js 24 LTS، و @nats-io/jetstream 3.4.0، و @nats-io/transport-node 3.4.0، و nats-server 2.14.12. تم فحص الأنواع في كل ملف مقابل الحزم المنشورة في 2 يونيو 2026. يستغرق الأمر حوالي 30-40 دقيقة.
ما ستتعلمه
- تشغيل خادم NATS مفعل به JetStream محلياً باستخدام Docker
- إنشاء مشروع Node 24 TypeScript يقوم بتشغيل
.tsمباشرة بدون خطوة بناء - نمذجة أحداث النطاق (domain events) بعقد مكتوب (typed contract) مشترك بين المنتج والعامل
- إنشاء تيار ومستهلك سحب مستدام مع إعدادات الاستبقاء والتأكيد وإعادة المحاولة الصحيحة
- نشر الأحداث بشكل متكرر (idempotently) بحيث لا تؤدي إعادة محاولة النشر إلى إنشاء نسخة مكررة
- بناء عامل يستهلك الرسائل مع تأكيد استلام صريح ومكرر غير متزامن (async iterator)
- إعادة محاولة الإخفاقات العابرة باستخدام
nak()المؤجل وفق جدول زمني للتراجع (backoff) من جهة العميل - إضافة طابور رسائل مهملة (dead-letter queue) للرسائل السامة، بما أن JetStream لا يحتوي على DLQ أصلي
- إغلاق العامل بشكل آمن (gracefully) حتى لا تضيع الرسائل التي قيد المعالجة
المتطلبات الأساسية
- Node.js 24 LTS أو أحدث. Node 24 هو خط LTS النشط الحالي، مدعوم حتى أبريل 2028، ويقوم بتشغيل ملفات TypeScript بشكل أصلي بدون
ts-nodeأوtsx34. تحقق باستخدامnode --version. - Docker (Desktop 4.x أو أي محرك) لتشغيل خادم NATS. تحقق باستخدام
Docker --version. - معرفة عملية بـ TypeScript و
async/await. - طرفية (Terminal). واجهة سطر أوامر
natsالاختيارية5 مفيدة للفحص ولكنها ليست مطلوبة — سنقوم بالتحقق عبر الكود.
التقنيات المستخدمة: nats-server 2.14.1 للاستمرارية، وعميل @nats-io/jetstream 3.4.0 الموديولار مع ناقل Node الخاص به @nats-io/transport-node 3.4.0 ومساعدات @nats-io/nats-core 3.4.0. إذا كنت قد رأيت استيرادات nats 2.x مع JSONCodec واستدعاء js.subscribe() القائم على الدفع (push-based) في منشورات قديمة، فإن تلك الحزمة الموحدة هي الجيل السابق؛ عميل v3 يقسم الناقل، والأساس، و JetStream إلى حزم منفصلة ويستخدم واجهة برمجة تطبيقات consume() القائمة على السحب (pull-based) API1.
هذا العامل هو التطبيق العملي للمفاهيم الموجودة في بنية الأنظمة القائمة على الأحداث، وبديل أصلي لـ NATS لـ طابور مهام مدعوم بـ Postgres مع pg-boss.
الخطوة 1 — تشغيل NATS مع JetStream
JetStream هو طبقة الاستمرارية داخل nats-server. لا يتم تفعيله افتراضياً — تقوم بتفعيله باستخدام علامة -js6. ابدأ خادماً باستخدام وسم صورة محدد:
Docker run --name nats-js -p 4222:4222 nats:2.14.1-alpine3.22 -js
المنفذ 4222 هو منفذ العميل الافتراضي. يجب أن ترى الخادم يسجل سطراً يحتوي على Starting JetStream بالقرب من أعلى المخرجات. اترك هذه الطرفية تعمل؛ وافتح واحدة جديدة لبقية البرنامج التعليمي.
للحفاظ على بيانات التيار عبر عمليات إعادة تشغيل الحاوية، قم بتركيب وحدة تخزين (volume) ووجه JetStream إليها:
Docker run --name nats-js -p 4222:4222 \
-v "$(pwd)/nats-data:/data" \
nats:2.14.1-alpine3.22 -js -sd /data
تخبر علامة -sd (دليل التخزين) JetStream بمكان كتابة التيارات المدعومة بالملفات. بدونها، يعيش تيار تخزين File فقط طالما بقيت الحاوية تعمل.
الخطوة 2 — إنشاء مشروع Node 24 TypeScript
أنشئ مشروعاً يمكن لـ Node 24 تشغيله بدون خطوة تجميع. المفتاح هو "type": "module" بحيث يعامل Node ملفات .ts كـ ES modules، بالإضافة إلى ملف tsconfig.json يستخدم فقط لفحص الأنواع — يقوم Node بتجريد الأنواع في وقت التشغيل ولا يقوم أبداً بفحص الأنواع4.
mkdir orders-worker && cd orders-worker
npm init -y
npm pkg set type=module
npm install @nats-io/jetstream@3.4.0 @nats-io/transport-node@3.4.0 @nats-io/nats-core@3.4.0
npm install -D TypeScript@5 @types/node@24
قم بتثبيت إصدار @types/node على الإصدار الرئيسي 24 بحيث تتطابق الأنواع مع بيئة تشغيل Node 24 الخاصة بك؛ حيث يتبع إصدار 25 إصدار Node 25 (الحالي)، وليس إصدار LTS الذي تقوم بتشغيله. الآن أضف ملف tsconfig.json:
{
"compilerOptions": {
"target": "ES2023",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"allowImportingTsExtensions": true,
"rewriteRelativeImportExtensions": true,
"strict": true,
"noUncheckedIndexedAccess": true,
"verbatimModuleSyntax": true,
"skipLibCheck": true,
"noEmit": true,
"lib": ["ES2023"],
"types": ["node"]
},
"include": ["src/**/*.ts"]
}
هناك إعدادان مهمان للتنفيذ الأصلي. يسمح لك allowImportingTsExtensions بكتابة import "./events.ts" بالامتداد الحقيقي، وهو ما يتطلبه محلل ESM الصارم في Node — فهو لا يخمن الامتدادات4. يجبرك verbatimModuleSyntax على تمييز استيرادات الأنواع فقط بـ import type، مما يحافظ على تجريد الأنواع بشكل لا لبس فيه. noEmit صحيح لأن TypeScript هو مدقق الكود (linter) الخاص بك هنا، وليس المترجم.
أضف نص فحص الأنواع (type-check script) إلى package.json:
{
"scripts": {
"typecheck": "tsc -p tsconfig.json"
}
}
قم بتشغيل npm run typecheck في أي وقت — يجب ألا يطبع شيئاً ويخرج بـ 0. تقوم بتشغيل البرامج الفعلية باستخدام node src/<file>.ts العادي.
الخطوة 3 — نمذجة أحداث النطاق الخاصة بك
ضع العقد الذي يشترك فيه المنتج والعامل في ملف واحد بحيث يؤدي تغيير اسم حقل إلى كسر كلا الجانبين في وقت فحص الأنواع. هذه هي العادة الأكثر قيمة في الكود القائم على الأحداث: الرسالة هي الـ API.
// src/events.ts
export interface OrderPlaced {
orderId: string;
email: string;
amountCents: number;
}
export const STREAM = "ORDERS";
export const SUBJECTS = "orders.>";
export const PLACED_SUBJECT = "orders.placed";
export const DURABLE = "email-sender";
export const DLQ_SUBJECT = "orders.dlq";
export const MAX_DELIVER = 5;
export const BACKOFF_MS = [1_000, 5_000, 15_000, 30_000];
يستخدم SUBJECTS حرف البدل > بحيث يلتقط التيار كل موضوع orders.* (مثل orders.placed، و orders.refunded، وما إلى ذلك)، بينما يقوم المستهلك في الخطوة 4 بالتصفية للوصول إلى orders.placed فقط. يتم مشاركة MAX_DELIVER بين تكوين المستهلك وفحص DLQ الخاص بالعامل حتى لا يختلف الاثنان أبداً، و BACKOFF_MS هو جدول تأخير إعادة المحاولة الذي يطبقه العامل في الخطوة 6.
الخطوة 4 — إنشاء التيار والمستهلك المستدام
الـ تيار (stream) يقوم بتخزين الرسائل؛ أما المستهلك (consumer) فهو مؤشر ذو حالة (stateful cursor) يتتبع الرسائل التي أكد العامل استلامها. نقوم بإنشاء كليهما مرة واحدة، مسبقاً، باستخدام مدير JetStream. التيارات والمستهلكون المستدامون تعريفية (declarative) — استدعاء add مرة أخرى بنفس الاسم هو عملية متكررة (idempotent) لنفس التكوين.
// src/setup.ts
import { connect } from "@nats-io/transport-node";
import {
jetstreamManager,
AckPolicy,
DeliverPolicy,
RetentionPolicy,
StorageType,
} from "@nats-io/jetstream";
import { nanos } from "@nats-io/nats-core";
import { STREAM, SUBJECTS, DURABLE, PLACED_SUBJECT, MAX_DELIVER } from "./events.ts";
const nc = await connect({ servers: process.env.NATS_URL ?? "localhost:4222" });
const jsm = await jetstreamManager(nc);
const stream = await jsm.streams.add({
name: STREAM,
subjects: [SUBJECTS],
retention: RetentionPolicy.Limits,
storage: StorageType.File,
max_age: nanos(7 * 24 * 60 * 60 * 1000),
num_replicas: 1,
});
console.log(`stream ${stream.config.name} <- [${stream.config.subjects.join(", ")}]`);
const consumer = await jsm.consumers.add(STREAM, {
durable_name: DURABLE,
ack_policy: AckPolicy.Explicit,
deliver_policy: DeliverPolicy.All,
filter_subject: PLACED_SUBJECT,
ack_wait: nanos(30_000),
max_deliver: MAX_DELIVER,
max_ack_pending: 100,
});
console.log(`consumer ${consumer.name} ready (max_deliver=${consumer.config.max_deliver})`);
await nc.drain();
كل مدة زمنية في JetStream تكون بالنانو ثانية، لذا فإن المساعد nanos() من @nats-io/nats-core يقوم بتحويل الملي ثانية نيابة عنك — nanos(30_000) تعادل 30 ثانية2. إعدادات المستهلك (consumer) هي المكان الذي يحدد سلوك إعادة المحاولة7:
| الإعداد | القيمة هنا | ما يتحكم فيه |
|---|---|---|
ack_policy | Explicit | يجب تأكيد استلام كل رسالة بشكل فردي. بدون هذا لن تكون هناك إعادة تسليم. |
deliver_policy | All | البدء من أول رسالة في الـ stream. المستهلكون الدائمون (durable) الجدد يعيدون تشغيل السجل التاريخي. |
filter_subject | orders.placed | تسليم هذا الموضوع فقط، حتى لو كان الـ stream يلتقط orders.>. |
ack_wait | 30s | إذا تم تسليم رسالة ولم يتم تأكيدها (ack) أو رفضها (nak) أو تعليمها كـ working() خلال هذه النافذة، يقوم JetStream بإعادة تسليمها. هذه هي شبكة الأمان الخاصة بك ضد الانهيارات. |
max_deliver | 5 | حد أقصى لإجمالي محاولات التسليم. بعد المحاولة الخامسة، يتوقف JetStream عن إعادة التسليم من تلقاء نفسه. |
max_ack_pending | 100 | عدد الرسائل غير المؤكدة التي يمكن أن تكون قيد المعالجة في وقت واحد — سقف التوازي الخاص بك. |
سياسة RetentionPolicy.Limits تحتفظ بالرسائل حتى يتم الوصول إلى حد العمر أو الحجم الخاص بالـ stream، بغض النظر عن الاستهلاك — وهو أمر جيد لسجل أحداث يقرأه مستهلكون متعددون. (في المقابل، سياسة الاحتفاظ Workqueue تحذف الرسالة بمجرد تأكيد استلامها). النوع StorageType.File يحفظ البيانات على القرص حتى لا يؤدي إعادة تشغيل الخادم إلى فقدان الـ stream.
يوجد أيضًا خيار backoff على مستوى المستهلك (مصفوفة من تأخيرات النانو ثانية)، ولكن لاحظ شيئين من وثائق NATS قبل استخدامه: إنه ينطبق فقط على مهلات تأكيد الاستلام (acknowledgement timeouts)، وليس على الـ nak الصريح، وعند ضبطه، فإن قيمته الأولى تتجاوز ack_wait7. نظرًا لأن هذا البرنامج التعليمي يدير عمليات إعادة المحاولة باستخدام nak صريح (الخطوة 6)، فإننا نترك backoff غير مفعل ونبقي ack_wait كمهلة واضحة مدتها 30 ثانية، مع تطبيق جدول التراجع (backoff) بأنفسنا على العميل.
قم بتشغيله:
node src/setup.ts
المخرجات المتوقعة:
stream ORDERS <- [orders.>]
consumer email-sender ready (max_deliver=5)
الخطوة 5 — نشر الأحداث بشكل متكرر (Idempotently)
النشر إلى JetStream يعيد PubAck يؤكد تخزين الرسالة ويعطيك رقم تسلسلها في الـ stream. الخيار المهم هو msgID: إذا قمت بالنشر مرتين بنفس الـ msgID داخل نافذة التكرار الخاصة بالـ stream، فسيقوم JetStream بتخزينها مرة واحدة ويعود الـ PubAck الثاني مع duplicate: true8. هذا يجعل عملية النشر آمنة لإعادة المحاولة: إذا تسبب خلل في الشبكة في إعادة الإرسال، فسيتم تخزين الحدث مرة واحدة فقط بدلاً من إنشاء نسخة مكررة.
// src/producer.ts
import { connect } from "@nats-io/transport-node";
import { jetstream } from "@nats-io/jetstream";
import type { OrderPlaced } from "./events.ts";
import { PLACED_SUBJECT } from "./events.ts";
const nc = await connect({ servers: process.env.NATS_URL ?? "localhost:4222" });
const js jetstream(nc);
const order: OrderPlaced = {
orderId: crypto.randomUUID(),
email: "buyer@example.com",
amountCents: 4200,
};
const ack = await js.publish(PLACED_SUBJECT, JSON.stringify(order), {
msgID: order.orderId,
});
console.log(`stored seq=${ack.seq} duplicate=${ack.duplicate}`);
await nc.drain();
crypto.randomUUID() هو متغير عالمي في Node، لا حاجة لاستيراده. قم بتشغيله مرة واحدة:
node src/producer.ts
# stored seq=1 duplicate=false
قم بتشغيله مرة ثانية وسيرتفع التسلسل لأن كل استدعاء يولد orderId جديدًا. لكي ترى منع التكرار، قم بتثبيت المعرف orderId: "fixed-id-123" وشغله مرتين بسرعة — ستطبع المرة الثانية duplicate=true ولن يتقدم الـ stream. هذه هي الطريقة التي تجعل بها منتج الأحداث آمنًا لإعادة المحاولة بعد خلل في الشبكة.
الخطوة 6 — بناء العامل (worker) الذي يستهلك ويؤكد الاستلام
يحصل العامل على المستهلك بالاسم ويستدعي consume()، الذي يعيد مكررًا غير متزامن (async iterable) للرسائل. تسحب الرسائل عن طريق التكرار؛ وتخبر JetStream بنتيجة كل منها عن طريق استدعاء ack() أو nak() أو term() على الرسالة9.
// src/worker.ts
import { connect } from "@nats-io/transport-node";
import { jetstream } from "@nats-io/jetstream";
import type { OrderPlaced } from "./events.ts";
import { STREAM, DURABLE, DLQ_SUBJECT, MAX_DELIVER, BACKOFF_MS } from "./events.ts";
const nc = await connect({ servers: process.env.NATS_URL ?? "localhost:4222" });
const js = jetstream(nc);
const consumer = await js.consumers.get(STREAM, DURABLE);
async function sendReceipt(order: OrderPlaced): Promise<void> {
if (order.amountCents <= 0) throw new Error(`invalid amount for ${order.orderId}`);
console.log(`receipt sent to ${order.email} for order ${order.orderId}`);
}
const messages = await consumer.consume({ max_messages: 10 });
process.on("SIGTERM", () => messages.close());
process.on("SIGINT", () => messages.close());
console.log(`worker up; waiting for "${STREAM" messages`);
for await (const m of messages) {
let order: OrderPlaced;
try {
order = m.json<OrderPlaced>();
} catch {
m.term("unparseable payload");
continue;
}
try {
await sendReceipt(order);
m.ack();
} catch (err) {
if (m.info.deliveryCount >= MAX_DELIVER) {
await js.publish(DLQ_SUBJECT, m.data, { msgID: `dlq-${m.seq` });
m.term(`moved to DLQ after ${m.info.deliveryCount attempts`);
console.error(`DLQ: order ${order.orderId (${String(err))`);
} else {
const delayMs = BACKOFF_MS[Math.min(m.info.deliveryCount - 1, BACKOFF_MS.length - 1)] ?? 30_000;
console.warn(`retry ${m.info.deliveryCount/${MAX_DELIVER for ${order.orderId in ${delayMsms`);
m.nak(delayMs);
}
}
}
await nc.drain();
ثلاث نتائج للرسائل تقوم بكل العمل:
m.ack()— نجاح. يقوم JetStream بتقديم المستهلك ولا يعيد تسليم هذه الرسالة أبدًا.m.nak(delayMs)— تأكيد سلبي مع تأخير صريح. الـm.nak()المجرد يعيد التسليم فورًا تقريبًا؛ تمرير تأخير بالملي ثانية هو ما يؤدي فعليًا إلى تراجع (back off) إعادة المحاولة7. نحن نستخدمBACKOFF_MSبناءً على عدد عمليات التسليم الحالية، لذا تنتظر المحاولات 1 ثانية، 5 ثوانٍ، 15 ثانية، ثم 30 ثانية. استخدم هذا للفشل المؤقت (مهلة في الخدمة التابعة، خطأ 503).m.term(reason)— إنهاء. التوقف عن إعادة تسليم هذه الرسالة نهائيًا، بغض النظر عنmax_deliver. استخدم هذا للفشل الدائم (حمولة لن يتم تحليلها أبدًا).
يقوم m.json<OrderPlaced>() بفك تشفير جسم JSON إلى النوع الخاص بك؛ ويقوم الـ try/catch المحيط بتحويل الرسالة غير القابلة للتحليل إلى term() حتى لا تؤدي حمولة واحدة سيئة إلى تعطيل العامل للأبد. m.info.deliveryCount هو عدد المرات التي تم فيها تسليم هذه الرسالة المحددة — وهي القيمة التي يعتمد عليها كل من مؤشر التراجع وفحص DLQ في الخطوة 7.
قم بتشغيل العامل (في نافذة Terminal خاصة به، مع التأكد من أن المنتج من الخطوة 5 قد نشر مرة واحدة على الأقل):
node src/worker.ts
المخرجات المتوقعة:
worker up; waiting for "ORDERS" messages
receipt sent to buyer@example.com for order 7c3e9b12-5a4f-4c0e-9b1a-2f6d8e4a1c33
اتركه يعمل وقم بتشغيل node src/producer.ts مرة أخرى من نافذة Terminal أخرى — سيطبع العامل سطر إيصال جديد في غضون لحظة.
الخطوة 7 — إضافة طابور الرسائل المهملة (Dead-letter queue)
لا يحتوي JetStream على طابور رسائل مهملة مدمج10. عندما تصل الرسالة إلى max_deliver، يتوقف JetStream ببساطة عن إعادة تسليمها — وبدون خطوة DLQ صريحة، يكون الفشل صامتًا. النمط المتبع هو اكتشاف المحاولة النهائية بنفسك، ونسخ الحمولة الخام إلى موضوع DLQ، وإنهاء (term()) الرسالة الأصلية بحيث يتم إزالتها من المجموعة النشطة فورًا. هذا هو بالضبط ما يفعله فرع else/if في الخطوة 6:
if (m.info.deliveryCount >= MAX_DELIVER) {
await js.publish(DLQ_SUBJECT, m.data, { msgID: `dlq-${m.seq` });
m.term(`moved to DLQ after ${m.info.deliveryCount attempts`);
}
هناك تفصيلان يجعلان هذا الأمر قويًا. أولاً، أعد نشر m.data (الـ Uint8Array الخام)، وليس كائنًا تمت إعادة تسلسله — يجب أن يحتفظ الـ DLQ بالبايتات الأصلية لإعادة التشغيل الجنائي. ثانيًا، الـ msgID: \dlq-${m.seq}`يربط نشر الـ DLQ بالتسلسل الأصلي للـ stream، لذا إذا تعطل العامل بينpublishوterm` وتم إعادة تسليم الرسالة، فلن تكتب إعادة التشغيل إدخال DLQ مكررًا.
لكي يكون الـ DLQ طابورًا حقيقيًا يمكنك فحصه وإعادة تشغيله، تأكد من أن موضوع orders.dlq يتم التقاطه بواسطة stream. يغطيه بالفعل الرمز العام orders.> الخاص بـ stream الـ ORDERS، لذا تهبط رسائل DLQ مرة أخرى في نفس الـ stream تحت موضوع مختلف. في نظام أكبر، ستعطي الـ DLQ الـ stream الخاص به مع فترة احتفاظ أطول ومستهلك للتنبيه. لتجربة المسار بالكامل، انشر طلبًا بـ amountCents: 0 — سيقوم sendReceipt برمي خطأ في كل مرة، وسيسجل العامل retry 1/5 … retry 4/5، وفي التسليم الخامس سيعيد النشر إلى orders.dlq وينهي الرسالة الأصلية.
الخطوة 8 — الإغلاق التدريجي (Graceful shutdown)
العامل (worker) الذي يتم إيقافه في منتصف معالجة رسالة يجب ألا يفقد تلك الرسالة. الجزآن المذكوران أعلاه يتعاملان بالفعل مع هذا الأمر. messages.close() (المرتبط بـ SIGTERM و SIGINT) يتوقف عن سحب رسائل جديدة ولكنه يسمح للدورة الحالية بالانتهاء، لذا تنتهي حلقة for await بشكل نظيف. بعد الحلقة، يقوم nc.drain() بتفريغ التأكيدات (acks) المعلقة وإغلاق الاتصال بالترتيب11. أي رسالة تم تسليمها ولكن لم يتم تأكيدها بعد عند خروج العملية، يتم إعادة تسليمها تلقائيًا بمجرد انقضاء ack_wait (30 ثانية)، لأن JetStream لا يعتبر أبدًا الرسالة غير المؤكدة منتهية. هذا هو ضمان الأمان للتأكيد الصريح: التسليم "مرة واحدة على الأقل" ينجو من الانهيار12.
اضغط على Ctrl+C في نافذة العامل وسترى الحلقة تتوقف والعملية تخرج دون تحذير بشأن اتصال معلق.
التحقق
قم بتأكيد خط الأنابيب بالكامل من الكود، لا حاجة لواجهة الأوامر (CLI). أنشئ src/verify.ts:
// src/verify.ts
import { connect } from "@nats-io/transport-node";
import { jetstreamManager } from "@nats-io/jetstream";
import { STREAM, DURABLE } from "./events.ts";
const nc = await connect({ servers: process.env.NATS_URL ?? "localhost:4222" });
const jsm = await jetstreamManager(nc);
const info = await jsm.streams.info(STREAM);
console.log(`stream messages: ${info.state.messages}`);
const c = await jsm.consumers.info(STREAM, DURABLE);
console.log(`pending (unacked + undelivered): ${c.num_pending}`);
console.log(`redelivered in flight: ${c.num_redelivered}`);
await nc.drain();
node src/verify.ts
بعد نشر بضعة طلبات صحيحة وترك العامل ينهيها، يجب أن ينخفض pending إلى 0. إذا قمت بنشر طلب "سام" (amountCents: 0)، فسترى رسالة إضافية واحدة في التدفق (نسخة DLQ على orders.dlq) وسيعكس num_redelivered محاولات إعادة المحاولة. إذا كان لديك CLI الخاص بـ nats5 مثبتًا، فإن nats consumer report ORDERS يعرض نفس الأرقام في جدول.
الأخطاء الشائعة
هذه هي الإخفاقات التي يواجهها الأشخاص فعليًا، مستمدة من متتبع مشكلات NATS وملف README الخاص بالعميل.
Error: no respondersأو توقفconnect()— لا يمكن الوصول إلى الخادم على المنفذ4222، أو أنك قمت بتشغيله بدون خيار-js. تأكد من أن حاوية Docker تعمل (Docker ps) وأن سجلها يظهرStarting JetStream. تتوقف استدعاءات JetStream API أو تعطي خطأ عندما يكون JetStream معطلاً.An import path can only end with a '.ts' extension when 'allowImportingTsExtensions' is enabled— ملفtsconfig.jsonالخاص بك يفتقد إلىallowImportingTsExtensions: true. يحتاج Node إلى امتداد.tsفي محدد الاستيراد، ويحتاج مدقق النوع (type-checker) إلى هذا العلم للسماح بذلك4.ERR_UNKNOWN_FILE_EXTENSIONعند تشغيلnode src/setup.ts— أنت تستخدم إصدار Node أقدم من 22.18 / 24، أو أن"type": "module"مفقود منpackage.json. تنفيذ TypeScript الأصلي مستقر في Node 24؛ تحقق منnode --version4.- الرسائل لا تعيد التسليم أبدًا بعد الفشل — سياسة التأكيد (
ack_policy) للمستهلك ليستExplicit، أو أن المعالج الخاص بك يستدعيack()في كتلةfinallyالتي تعمل حتى عند حدوث خطأ. قم باستدعاءack()فقط عند النجاح الفعلي؛ واستخدمnak()أوterm()عند الفشل. - نفس الرسالة تعيد التسليم للأبد — أنت تقوم بعمل
nakلفشل دائم. الحمولة التي لا يمكن أن تنجح أبدًا يجب إنهاؤها باستخدامterm()(أو إرسالها إلى DLQ)، وليس عملnakلها. تأكد من أن فرع DLQ يقارن بـmax_deliverبشكل صحيح باستخدام>=، وليس>.
الخطوات التالية ومزيد من القراءة
لديك الآن عامل متين، يدعم إعادة المحاولة، ومدعوم بـ DLQ في أقل من 150 سطرًا من TypeScript. من هنا:
- امنح DLQ تدفقًا خاصًا به مع فترة احتفاظ أطول ومستهلك تنبيه منفصل، بدلاً من مشاركة تدفق
ORDERS. - أضف مستهلكًا متينًا ثانيًا على نفس التدفق (على سبيل المثال
analytics) — مع احتفاظLimits، يقرأ عدة مستهلكين نفس الأحداث بشكل مستقل. هذه هي الميزة الأساسية على طابور القارئ الواحد. - قم بالتوسع أفقيًا عن طريق تشغيل نسخ متعددة من العامل مقابل نفس المستهلك المتين؛ يقوم JetStream بموازنة تحميل الرسائل عبرها حتى يصل إلى
max_ack_pending.
للخلفية المعمارية، راجع بنية الأنظمة القائمة على الأحداث والمفاضلات بين الوسطاء في Kafka مقابل RabbitMQ مقابل SQS مقابل NATS. للحصول على بديل مبني على قاعدة بياناتك الحالية، قارن مع طابور مهام pg-boss المدعوم بـ Postgres.
Footnotes
-
@nats-io/jetstreamعلى npm — 3.4.0 هو وسم التوزيعlatestالحالي (نُشر في 2026-05-08). عميل v3 معياري: النقل، والأساس، و JetStream تُشحن كحزم منفصلة. https://www.npmjs.com/package/@nats-io/jetstream ↩ ↩2 -
إصدارات
nats-server— v2.14.1 صدر في 2026-05-20. يتم التعبير عن مدد JetStream بالنانو ثانية. https://GitHub.com/nats-io/nats-server/releases ↩ ↩2 -
جدول إصدارات Node.js — Node 24 هو خط LTS النشط الحالي، مدعوم حتى أبريل 2028. https://nodejs.org/en/about/previous-releases ↩
-
"Running TypeScript Natively | Node.js" و "Modules: TypeScript | Node.js Documentation" — تجريد الأنواع (type stripping) مفعل افتراضيًا (منذ v22.18.0 / v23.6.0، ومستقر في 24)، ولا يقوم Node بفحص الأنواع في وقت التشغيل، ويتطلب تحليل ESM امتدادات استيراد
.tsصريحة. https://nodejs.org/learn/TypeScript/run-natively ↩ ↩2 ↩3 ↩4 ↩5 -
natscli(أداة سطر أوامرnats) — v0.4.0 صدر في 2026-05-01. https://GitHub.com/nats-io/natscli/releases ↩ ↩2 -
"JetStream | NATS Docs" — تشغيل صورة
natsDocker الرسمية مع علم-jsلتفعيل JetStream. https://docs.nats.io/running-a-nats-service/nats_docker/jetstream_docker ↩ -
"Consumers | NATS Docs" — دلالات
ack_policy، وack_wait، وmax_deliver، وbackoffللمستهلكين من نوع pull. https://docs.nats.io/nats-concepts/jetstream/consumers ↩ ↩2 ↩3 -
JetStreamPublishOptions.msgID— الرسالة التي يتم نشرها باستخدامmsgIDتمت رؤيته بالفعل داخلduplicate_windowالخاص بالتدفق يتم تخزينها مرة واحدة فقط؛ ويعيدPubAckالقيمةduplicate: true. https://docs.nats.io/nats-concepts/jetstream/streams ↩ -
@nats-io/jetstreamREADME — تعيد الـ API الخاصة بـconsume()من نوع pull مكررًا غير متزامن (async iterable) من نوعJsMsg؛ يتم إنهاء كل رسالة باستخدامack()، أوnak()، أوterm()، أوworking(). https://GitHub.com/nats-io/nats.js/blob/main/jetstream/README.md ↩ -
لا يحتوي NATS JetStream على طابور رسائل مهملة (dead-letter queue) أصلي؛ النهج الموثق هو اكتشاف استنفاد
max_deliver(عبر تنبيهات عدد التسليم أو عدد تسليم الرسالة) وإعادة النشر إلى موضوع DLQ في تطبيقك. https://docs.nats.io/nats-concepts/jetstream/consumers ↩ -
@nats-io/transport-nodeعلى npm (3.4.0) يوفرconnect()وNatsConnection.drain()لبيئة Node. https://www.npmjs.com/package/@nats-io/transport-node ↩ -
@nats-io/jetstreamREADME — يوفر JetStream تسليمًا "مرة واحدة على الأقل" (at-least-once)؛ حيث يؤدي التأكيد الصريح (explicit acknowledgement) إلى إعادة تسليم الرسائل التي لم يتم تأكيدها. https://GitHub.com/nats-io/nats.js/blob/main/jetstream/README.md ↩