Skip to main content

Overview

Backpressure occurs when frames are enqueued faster than they can be encoded, leading to:
  • Memory buildup (queue grows unbounded)
  • Out of memory errors (heap exhausted)
  • Slow performance (system thrashing)
Proper backpressure management ensures stable, efficient encoding.

The Problem

// ❌ BAD: No backpressure management
for (let i = 0; i < 10000; i++) {
  const frame = createFrame(i);
  encoder.encode(frame);  // Queue grows unbounded!
  frame.close();
}
// Memory usage: 📈📈📈 → 💥 CRASH

The Solution

// ✅ GOOD: Monitor and limit queue size
for (let i = 0; i < 10000; i++) {
  const frame = createFrame(i);
  encoder.encode(frame);
  frame.close();

  // Wait if queue is too large
  if (encoder.encodeQueueSize > 10) {
    await new Promise(resolve => {
      encoder.addEventListener('dequeue', resolve, { once: true });
    });
  }
}

Monitoring Queue Size

The encodeQueueSize property shows pending frames:
const encoder = new VideoEncoder({ /* ... */ });

encoder.configure(config);

console.log('Queue size:', encoder.encodeQueueSize);  // 0

encoder.encode(frame1);
console.log('Queue size:', encoder.encodeQueueSize);  // 1

encoder.encode(frame2);
console.log('Queue size:', encoder.encodeQueueSize);  // 2

// After encoding completes
await encoder.flush();
console.log('Queue size:', encoder.encodeQueueSize);  // 0

Simple Throttling

Wait when queue exceeds threshold:
async function encodeWithThrottling(frames, maxQueueSize = 10) {
  const encoder = new VideoEncoder({
    output: (chunk) => { /* save chunk */ },
    error: (err) => { throw err; }
  });

  encoder.configure({
    codec: 'avc1.42E01E',
    width: 1920,
    height: 1080,
    bitrate: 5_000_000
  });

  for (let i = 0; i < frames.length; i++) {
    encoder.encode(frames[i]);
    frames[i].close();

    // Throttle if queue is full
    while (encoder.encodeQueueSize > maxQueueSize) {
      await new Promise(resolve => setTimeout(resolve, 10));
    }

    if (i % 100 === 0) {
      console.log(`Progress: ${i}/${frames.length}, Queue: ${encoder.encodeQueueSize}`);
    }
  }

  await encoder.flush();
  encoder.close();
}

Event-Based Backpressure

Use dequeue events for precise control:
async function encodeWithEvents(frames) {
  const encoder = new VideoEncoder({
    output: (chunk) => { /* save chunk */ },
    error: (err) => { throw err; }
  });

  encoder.configure({
    codec: 'avc1.42E01E',
    width: 1920,
    height: 1080,
    bitrate: 5_000_000
  });

  let i = 0;

  async function encodeNext() {
    while (i < frames.length && encoder.encodeQueueSize < 10) {
      encoder.encode(frames[i]);
      frames[i].close();
      i++;
    }

    if (i < frames.length) {
      // Wait for queue to drain
      await new Promise(resolve => {
        encoder.addEventListener('dequeue', resolve, { once: true });
      });
      await encodeNext();  // Continue
    }
  }

  await encodeNext();
  await encoder.flush();
  encoder.close();
}

Adaptive Throttling

Adjust queue size based on system load:
class AdaptiveEncoder {
  constructor(config) {
    this.config = config;
    this.maxQueueSize = 10;
    this.encoder = null;
  }

  async encode(frames) {
    this.encoder = new VideoEncoder({
      output: (chunk) => this.onChunk(chunk),
      error: (err) => { throw err; }
    });

    this.encoder.configure(this.config);

    for (let i = 0; i < frames.length; i++) {
      await this.encodeFrame(frames[i]);

      // Adjust queue size based on memory
      if (i % 100 === 0) {
        this.adjustQueueSize();
      }
    }

    await this.encoder.flush();
    this.encoder.close();
  }

  async encodeFrame(frame) {
    this.encoder.encode(frame);
    frame.close();

    while (this.encoder.encodeQueueSize > this.maxQueueSize) {
      await new Promise(resolve => setTimeout(resolve, 10));
    }
  }

  adjustQueueSize() {
    const memUsage = process.memoryUsage().heapUsed / 1024 / 1024;

    if (memUsage > 1024) {  // > 1GB
      this.maxQueueSize = Math.max(5, this.maxQueueSize - 1);
      console.log(`⚠️  High memory, reducing queue to ${this.maxQueueSize}`);
    } else if (memUsage < 512) {  // < 512MB
      this.maxQueueSize = Math.min(20, this.maxQueueSize + 1);
      console.log(`✓ Low memory, increasing queue to ${this.maxQueueSize}`);
    }
  }

  onChunk(chunk) {
    // Process chunk
  }
}

Batch Processing

Process frames in manageable batches:
async function encodeBatched(allFrames, batchSize = 100) {
  const encoder = new VideoEncoder({
    output: (chunk) => { /* save */ },
    error: (err) => { throw err; }
  });

  encoder.configure({
    codec: 'avc1.42E01E',
    width: 1920,
    height: 1080,
    bitrate: 5_000_000
  });

  for (let i = 0; i < allFrames.length; i += batchSize) {
    const batch = allFrames.slice(i, i + batchSize);

    console.log(`Processing batch ${Math.floor(i / batchSize) + 1}...`);

    for (const frame of batch) {
      encoder.encode(frame);
      frame.close();
    }

    // Flush batch
    await encoder.flush();

    console.log(`✓ Completed batch (${i + batch.length}/${allFrames.length})`);
  }

  encoder.close();
}

Memory Monitoring

Track memory usage during encoding:
async function encodeWithMonitoring(frames) {
  const encoder = new VideoEncoder({
    output: (chunk) => { /* save */ },
    error: (err) => { throw err; }
  });

  encoder.configure({
    codec: 'avc1.42E01E',
    width: 1920,
    height: 1080,
    bitrate: 5_000_000
  });

  const startMem = process.memoryUsage().heapUsed;

  for (let i = 0; i < frames.length; i++) {
    encoder.encode(frames[i]);
    frames[i].close();

    // Monitor every 100 frames
    if (i % 100 === 0) {
      const currentMem = process.memoryUsage().heapUsed;
      const memDelta = (currentMem - startMem) / 1024 / 1024;

      console.log(`Frame ${i}/${frames.length}`);
      console.log(`  Memory delta: ${memDelta.toFixed(1)} MB`);
      console.log(`  Queue size: ${encoder.encodeQueueSize}`);

      // Force GC if available
      if (global.gc && memDelta > 500) {
        global.gc();
      }
    }

    // Throttle
    while (encoder.encodeQueueSize > 10) {
      await new Promise(resolve => setTimeout(resolve, 10));
    }
  }

  await encoder.flush();
  encoder.close();
}

// Run with: node --expose-gc script.js

Best Practices

// Too small: slower encoding
maxQueueSize: 2  // ✗ Underutilized

// Too large: memory issues
maxQueueSize: 1000  // ✗ Risk OOM

// Balanced:
maxQueueSize: 10-20  // ✓ Good balance
encoder.encode(frame);
frame.close();  // ← Don't delay this!
Closing frames late increases memory pressure.
setInterval(() => {
  const mem = process.memoryUsage().heapUsed / 1024 / 1024;
  console.log(`Memory: ${mem.toFixed(1)} MB`);
}, 5000);
For thousands of frames, process in batches of 100-1000.

Next Steps