Free Ebook cover Edge AI in Practice: Building Privacy-Preserving, Low-Latency Intelligence on Devices

Edge AI in Practice: Building Privacy-Preserving, Low-Latency Intelligence on Devices

New course

16 pages

Mini-Project: Real-Time Vision on a Camera Module

Capítulo 15

Estimated reading time: 0 minutes

+ Exercise

Project Goal and What You Will Build

Goal: build a small, end-to-end real-time vision app that runs entirely on a camera module device and produces an on-device output (overlay, GPIO trigger, or serial message) with predictable frame rate and stable behavior.

What you will build: a camera pipeline that captures frames, preprocesses them, runs a compact vision model, postprocesses predictions, and drives an output. You will implement a “vision loop” that is robust to camera hiccups, variable lighting, and occasional inference slowdowns.

Example tasks (pick one): (1) person detection to trigger an LED, (2) package/box detection for a tabletop demo, (3) simple hand-gesture classification to control a menu, (4) defect/not-defect classification for a fixed scene. Choose a task that can be demonstrated with a static camera and a small set of objects.

Choosing a Camera Module and Defining Constraints

Pick a target device: this mini-project assumes a “camera module” class device: a small SBC or microcontroller board with a camera interface (CSI/USB) and enough compute to run a lightweight model. Examples include a Raspberry Pi with a camera, an embedded Linux camera board, or an MCU-class camera board with an integrated sensor.

Define constraints up front: write down (a) target FPS, (b) maximum acceptable end-to-end latency, (c) input resolution, (d) output behavior (overlay vs. GPIO), and (e) memory ceiling. Even if you already know how to budget latency in general, for this project you will translate those constraints into concrete design choices: frame size, model input size, and whether to run inference on every frame or every Nth frame.

Continue in our app.

You can listen to the audiobook with the screen off, receive a free certificate for this course, and also have access to 5,000 other free online courses.

Or continue reading below...
Download App

Download the app

Recommended starting point: 320×240 capture, model input 224×224 (or 192×192), inference every frame if possible; otherwise every 2nd frame with tracking or smoothing in between.

Pipeline Overview: Capture → Preprocess → Infer → Postprocess → Act

Capture: acquire frames from the camera with a stable timestamp. Prefer a pipeline that can deliver frames in a consistent format (e.g., YUV or RGB) and that supports fixed exposure/white balance if your device allows it.

Preprocess: resize/crop to model input, convert color space, normalize, and arrange tensor layout. Keep preprocessing lightweight; on many devices, preprocessing can become the bottleneck if done naively.

Infer: run the model on the preprocessed tensor. Your code should treat inference as a black box call that returns raw outputs.

Postprocess: decode outputs into meaningful results (class label, bounding boxes, confidence). Apply thresholds and temporal smoothing to avoid flicker.

Act: render an overlay, toggle a GPIO pin, send a UART/serial message, or publish a local event. Keep actuation deterministic and non-blocking.

Step-by-Step: Implement the Real-Time Vision Loop

Step 1 — Create a minimal camera capture program

Objective: confirm you can capture frames at the desired resolution and FPS before adding any ML.

  • Set a fixed capture resolution (start with 320×240 or 640×480).
  • Measure actual FPS and dropped frames.
  • Log timestamps and frame indices.
# Pseudocode (language-agnostic) for capture timing instrumentation
init_camera(width=320, height=240, fps=30)
frame_count = 0
start = now()
while running:
  frame = camera.read()
  t = now()
  frame_count += 1
  if frame_count % 60 == 0:
    elapsed = t - start
    print("capture_fps=", frame_count/elapsed)

Practical tip: if your capture FPS is unstable, fix exposure and white balance (if supported) and reduce resolution. Many camera stacks will silently change internal processing when auto-exposure is hunting, which can destabilize timing.

Step 2 — Add preprocessing with explicit, testable outputs

Objective: ensure the model input tensor is correct and consistent.

  • Decide on a resize strategy: letterbox (preserve aspect ratio with padding) or center-crop (fill the model input).
  • Make preprocessing deterministic: same input frame should produce the same tensor.
  • Validate by saving a few preprocessed images to disk (or display them) to confirm orientation, color, and cropping.
# Pseudocode for letterbox resize
function letterbox(image, target_w, target_h):
  scale = min(target_w/image.w, target_h/image.h)
  new_w = round(image.w * scale)
  new_h = round(image.h * scale)
  resized = resize(image, new_w, new_h)
  canvas = zeros(target_h, target_w, 3)
  x0 = (target_w - new_w) // 2
  y0 = (target_h - new_h) // 2
  canvas[y0:y0+new_h, x0:x0+new_w] = resized
  return canvas, scale, x0, y0

Common failure modes: swapped color channels (BGR vs RGB), rotated frames (sensor orientation), and incorrect normalization (e.g., using 0–1 vs -1–1). Catch these early by visualizing the preprocessed input.

Step 3 — Integrate inference and isolate it behind an interface

Objective: keep your vision loop clean by wrapping inference in a small module with a stable API.

  • Create a function like predict(input_tensor) → outputs.
  • Warm up the model with a few dummy runs to stabilize first-run overhead.
  • Record inference time separately from preprocessing and capture time.
# Pseudocode for timing breakdown
while running:
  t0 = now()
  frame = camera.read()
  t1 = now()
  tensor = preprocess(frame)
  t2 = now()
  outputs = model.predict(tensor)
  t3 = now()
  result = postprocess(outputs)
  t4 = now()
  log({
    "capture_ms": (t1-t0)*1000,
    "pre_ms": (t2-t1)*1000,
    "infer_ms": (t3-t2)*1000,
    "post_ms": (t4-t3)*1000
  })

Practical tip: if inference time varies widely, pin CPU frequency/performance mode if your platform supports it, and avoid running heavy logging or rendering in the same thread as inference.

Step 4 — Implement postprocessing with thresholds and temporal smoothing

Objective: convert raw outputs into stable, user-visible behavior.

For classification: apply a confidence threshold and a “debounce” window. For example, require the same class to be top-1 for 3 out of the last 5 frames before triggering an action.

For detection: decode boxes, apply non-maximum suppression (NMS) if needed, then track the best box across frames using a simple IoU-based association. Even a lightweight tracker can reduce flicker when inference is run every N frames.

# Pseudocode for classification smoothing
history = queue(maxlen=5)
function stable_class(pred_class, pred_conf):
  if pred_conf < 0.6:
    history.push("none")
  else:
    history.push(pred_class)
  counts = count_values(history)
  best = argmax(counts)
  if counts[best] >= 3 and best != "none":
    return best
  return "none"

Why this matters: real-time demos often fail not because the model is “wrong,” but because the output flickers. Smoothing makes behavior feel intentional and reliable.

Step 5 — Add an action layer (overlay, GPIO, or messages)

Objective: make the system do something observable without blocking the vision loop.

  • Overlay: draw boxes/labels and show FPS. Ensure rendering is optional so you can disable it for performance testing.
  • GPIO: toggle an LED or relay when a target is detected. Add a minimum on/off time to avoid rapid toggling.
  • Serial/UART: send a compact message like DETECT person 0.82 at a limited rate (e.g., 10 Hz max).
# Pseudocode for GPIO actuation with hold time
state = {"on": false, "until": 0}
function act(detected, now):
  if detected:
    gpio.set_high()
    state.on = true
    state.until = now + 0.5  # hold 500ms
  else if state.on and now > state.until:
    gpio.set_low()
    state.on = false

Design Pattern: Two-Thread Pipeline for Stable FPS

Problem: a single-thread loop can stutter when inference occasionally takes longer, causing capture to block and frames to pile up.

Pattern: split capture and inference into two threads (or processes) with a bounded queue. The capture thread always keeps the newest frame; the inference thread processes the latest available frame. This reduces latency because you avoid processing stale frames.

  • Capture thread: reads frames and pushes into a queue of size 1–2. If full, drop the oldest frame.
  • Inference thread: pops the newest frame, runs preprocess/infer/postprocess, updates shared “latest result.”
  • Render/act thread (optional): reads the latest frame and latest result to display/actuate.
# Pseudocode for bounded queue behavior
queue = BoundedQueue(maxsize=2)

capture_thread:
  while running:
    frame = camera.read()
    if queue.full():
      queue.pop_oldest()
    queue.push(frame)

infer_thread:
  while running:
    frame = queue.pop_latest(block=true)
    result = run_pipeline(frame)
    shared.latest_result = result

Practical tip: when you drop frames, your overlay FPS may look high but your inference FPS might be lower. Track both: capture FPS and inference FPS.

Handling Real-World Camera Issues

Lighting variation: if your scene changes brightness, your model confidence may oscillate. Use fixed exposure when possible, or add a simple brightness check and adapt thresholds (e.g., lower threshold slightly in low light, but cap it to avoid false positives).

Motion blur: fast motion can break detection. If your camera supports it, raise shutter speed (which may require more light). Alternatively, reduce the action sensitivity: require multiple consecutive detections before triggering.

Lens distortion and focus: small camera modules often have wide lenses and fixed focus. If your task is near-field (tabletop), ensure the lens is focused for that distance. If distortion is severe, consider cropping the central region of interest (ROI) rather than using the full frame.

Practical Debug Toolkit for This Mini-Project

On-screen diagnostics: overlay capture FPS, inference FPS, and last inference time. Also show the top prediction and confidence. This makes performance and correctness issues visible immediately.

Frame snapshots: save a frame and its preprocessed tensor visualization when confidence is near the threshold (e.g., 0.55–0.65). These “edge cases” are the most informative for improving stability.

Event logs: log only state changes (e.g., “person detected ON,” “person detected OFF”) rather than every frame. This keeps logs readable and reduces overhead.

Step-by-Step: Build a Person-Detection Demo (Concrete Example)

Objective: implement a simple person-detection pipeline that toggles an LED when a person is present in view.

Step 1 — Define I/O and thresholds

  • Input: camera frames at 320×240.
  • Model input: 224×224 RGB.
  • Output: LED on when a person is detected with confidence ≥ 0.6 for at least 3 of the last 5 inference frames.
  • Hold time: keep LED on for 500 ms after last stable detection.

Step 2 — Implement detection postprocessing

Detection outputs vary by model. Some return a fixed list of boxes with class IDs and scores; others return feature maps that need decoding. For this project, treat the runtime output as a list of (class_id, score, box) tuples after decoding.

  • Filter by class_id == person.
  • Filter by score ≥ threshold.
  • Select the highest-score person box.
# Pseudocode for selecting best person detection
function best_person(detections, score_thr):
  best = null
  for det in detections:
    if det.class_id == PERSON and det.score >= score_thr:
      if best == null or det.score > best.score:
        best = det
  return best

Step 3 — Add temporal smoothing and actuation

Use the smoothing function to convert per-frame detections into a stable boolean “person_present.” Then drive the GPIO with hold time to avoid flicker when the person briefly leaves the frame or confidence dips.

# Pseudocode combining detection + smoothing
presence_hist = queue(maxlen=5)

function update_presence(best_det):
  presence_hist.push(best_det != null)
  if sum(presence_hist) >= 3:
    return true
  return false

Step 4 — Add overlay for development mode

Development overlay: draw the best person box and score, plus inference timing. Keep a configuration flag so you can disable overlay for headless operation.

  • Box: map from model coordinates back to original frame using letterbox parameters (scale and padding).
  • Text: show person 0.82 and infer 18ms.

Optimizing the End-to-End Experience (Without Rehashing Prior Theory)

Reduce wasted work: if your model is slow, consider running inference every 2nd or 3rd frame and reusing the last result for overlay/actuation. Combine this with smoothing so the user experience remains stable.

Use ROI when the scene is structured: if the camera is fixed and the object appears in a known region (e.g., a doorway), crop to that ROI before resizing. This can improve accuracy and reduce compute.

Keep memory predictable: pre-allocate buffers for frames, resized images, and tensors. Avoid per-frame allocations that can cause jitter due to garbage collection or heap fragmentation.

Fail safely: if the camera read fails or inference throws an error, switch to a safe output state (e.g., LED off) and attempt recovery. Real devices will encounter transient camera errors.

# Pseudocode for safe fallback
try:
  frame = camera.read()
  result = run_pipeline(frame)
except Exception as e:
  gpio.set_low()
  result = {"error": true}
  camera.reset_if_needed()

Testing Checklist: What to Verify Before You Call It “Real-Time”

Functional correctness: test with positive and negative examples. For person detection, test empty scene, partial person, person at different distances, and multiple people.

Stability: verify that the output does not flicker under small changes. Adjust thresholds and smoothing window until behavior is consistent.

Performance under load: run for 30 minutes. Watch for FPS degradation, memory growth, or thermal throttling symptoms (e.g., inference time creeping up).

Recovery: unplug/replug the camera (if possible), cover the lens, and restore. Confirm the pipeline recovers without manual restart.

Extension Ideas (Optional Enhancements)

Multi-class actions: map different classes to different GPIO outputs or messages (e.g., person vs. pet).

Local privacy-preserving logging: store only aggregate counts (e.g., number of detections per minute) rather than frames, and expose them via a local dashboard.

Calibration mode: add a mode that displays histograms of confidence scores to help choose thresholds in the deployment environment.

Edge-to-edge demo: send only event messages to a local gateway (no images), demonstrating a complete on-device perception-to-action loop.

Now answer the exercise about the content:

In a two-thread real-time vision pipeline using a bounded queue, what is the main reason to drop older frames when the queue is full?

You are right! Congratulations, now go to the next page

You missed! Try again.

Dropping older frames keeps the inference thread working on the newest available frame, which prevents backlog and reduces perceived latency when inference occasionally slows down.

Next chapter

Production Checklists, Architecture Diagrams, and Troubleshooting Playbooks

Arrow Right Icon
Download the app to earn free Certification and listen to the courses in the background, even with the screen off.