Skip to main content

Overview

EventBatcher buffers events in memory and flushes them automatically — either when the buffer fills up or after a configurable idle timeout. Use it instead of calling event() per-action in high-frequency contexts like API middleware, webhooks, or background jobs.

Create a batcher

const batcher = churn.createBatcher({
  flushInterval: 5000,  // flush after 5s of inactivity
  maxSize: 50,          // flush immediately when 50 events accumulate
})

Signature

churn.createBatcher(options?: BatcherOptions): EventBatcher

BatcherOptions

flushInterval
number
Flush the buffer after this many milliseconds of inactivity. Default: 5000. The timer resets on every push(). Must be a positive finite number.
maxSize
number
Flush immediately when the buffer reaches this size. Default: 100. Must be between 1 and 500.
onFlush
(count: number) => void
Optional callback called after each successful flush. Receives the number of events flushed.
onError
(err: unknown) => void
Optional callback called when a flush fails. Use this to log errors without crashing.

EventBatcher interface

interface EventBatcher {
  push(event: BulkEventItem): void
  flush(): Promise<number>
  destroy(): Promise<void>
}

push(event)

Adds an event to the buffer. Triggers an immediate flush if maxSize is reached.
batcher.push({
  userId: 'user_123',
  event: 'api_called',
  properties: { endpoint: '/v1/data', method: 'GET' },
})
Calling push() after destroy() throws a ChurnKitError with code VALIDATION_ERROR.

flush()

Immediately flushes all buffered events. Returns the number of events sent.
const count = await batcher.flush()
console.log(`Flushed ${count} events`)

destroy()

Flushes remaining events, cancels the flush timer, and permanently closes the batcher. Call this on process shutdown or component unmount.
await batcher.destroy()

Examples

Express middleware

import express from 'express'
import ChurnKit from '@vgpprasad91/churnkit-sdk'

const churn = new ChurnKit({ apiKey: process.env.CHURNKIT_API_KEY! })
const batcher = churn.createBatcher({ flushInterval: 3000, maxSize: 50 })

app.use((req, res, next) => {
  if (req.user) {
    batcher.push({
      userId: req.user.id,
      event: 'api_called',
      properties: { method: req.method, path: req.path },
    })
  }
  next()
})

// Flush on graceful shutdown
process.on('SIGTERM', async () => {
  await batcher.destroy()
  process.exit(0)
})

Next.js App Router middleware

// middleware.ts
import { NextResponse } from 'next/server'
import type { NextRequest } from 'next/server'

// Module-level batcher (persists across requests in the same process)
import { batcher } from '@/lib/churnkit-batcher'

export function middleware(request: NextRequest) {
  const userId = request.cookies.get('session')?.value
  if (userId) {
    batcher.push({ userId, event: 'page_viewed', properties: { path: request.nextUrl.pathname } })
  }
  return NextResponse.next()
}

With error handling

const batcher = churn.createBatcher({
  flushInterval: 5000,
  maxSize: 100,
  onFlush: (count) => logger.info(`ChurnKit: flushed ${count} events`),
  onError: (err) => logger.error('ChurnKit flush failed', err),
})

Serverless (flush at end of request)

In serverless environments where timers don’t persist, flush manually at the end of each handler:
export async function handler(event: APIGatewayEvent) {
  const batcher = churn.createBatcher({ maxSize: 500 })

  for (const record of event.Records) {
    batcher.push({ userId: record.userId, event: 'processed' })
  }

  await batcher.destroy() // flush before Lambda exits
  return { statusCode: 200 }
}