Skip to content

Batch processing

When ingestion volume is high—such as recording analytical clickstream data, processing application log streams, or importing massive product catalog updates—sending messages one-by-one creates unnecessary CPU and network overhead.

To handle high-throughput workloads efficiently, Layeron Queue provides two key optimizations:

  1. Bulk Ingestion (sendBatch): Publish up to 100 messages into the queue in a single API call.
  2. High-Performance Consumer Tuning: Configure the consumer’s batchSize and concurrency parameters to lease and process multiple messages concurrently.

This example demonstrates receiving a batch of client-side tracking events in a single HTTP request, pushing them immediately to the queue using .sendBatch(), and tuning the consumer to process events concurrently in batches of 50.

Terminal window
import { backend } from "@layeron/core"
import { queue } from "@layeron/modules"
const app = backend()
// 1. Declare the queue with high-throughput consumer tuning
const telemetryQueue = queue({
name: "telemetry-events",
// Tune consumer batching and concurrency
consumer: {
batchSize: 50, // Fetch up to 50 messages in a single lease batch
concurrency: 10, // Run up to 10 batch-handling worker instances concurrently
visibilityTimeoutSeconds: 60, // Lock messages for 60 seconds during processing
},
retry: {
maxAttempts: 3,
backoff: "fixed",
initialDelaySeconds: 5,
},
})
app.use(telemetryQueue)
// 2. High-Throughput Bulk Ingestion Route
app.post("/api/telemetry", async (request) => {
const body = await request.json() as { events: Array<{ type: string; url: string; userId: string }> }
if (!body.events || !Array.isArray(body.events)) {
return new Response("Invalid events list", { status: 400 })
}
// Map events to the QueueSendInput format.
// We can attach unique idempotency keys or custom headers per message if needed.
const messagesToSend = body.events.map((event) => ({
payload: {
type: event.type,
url: event.url,
userId: event.userId,
ip: request.headers.get("cf-connecting-ip") || "unknown",
timestamp: new Date().toISOString(),
},
// Optional: deduplicate identical clicks sent in the same window
idempotencyKey: `${event.userId}_${event.type}_${Date.now()}`,
}))
// Send up to 100 messages in a single highly-efficient batch call
const result = await telemetryQueue.sendBatch(messagesToSend)
return Response.json({
status: "accepted",
messagesEnqueued: result.messages.length,
}, { status: 202 })
})
// 3. Batch Consumer Handler
// Because batchSize is 50, Layeron can deliver up to 50 messages per batch.
// The consumer handler runs concurrently up to 10 times.
telemetryQueue.consume(async (message) => {
const event = message.payload
// Process the individual message (e.g. record metrics, write to Layeron Database, push to Storage)
await saveTelemetryEvent(event)
})
async function saveTelemetryEvent(event: any) {
// Save to persistent storage...
}
  1. Reduced Network Roundtrips: By calling telemetryQueue.sendBatch(...), your app writes many items with one queue call. This dramatically reduces latency compared to invoking send() 100 times in a loop.
  2. Batch Delivery: The consumer is configured with batchSize: 50, so Layeron can deliver up to 50 messages to the consumer at a time. This reduces per-message overhead in high-volume workloads.
  3. Concurrency Control: With concurrency: 10, Layeron allows up to 10 consumer executions to run simultaneously. This helps drain the queue quickly during peak traffic times.
  4. Visibility Timeout Safety: Since the consumer processes up to 50 messages per batch, the visibilityTimeoutSeconds is set to 60 seconds. This ensures that even if database operations are slightly slow under load, the messages will not unlock and get re-leased by another worker before the consumer finishes.