Aggregate filter plugin

Groups information across events that share a common task identifier so you can produce correlated outputs (such as transaction durations, per-session summaries, or paired request/response records). State is held in memory per pipeline worker, which means you should run aggregating pipelines on a single worker.

  • Package: logstash-filter-aggregate
  • Coverage source: default/bundled
  • Official catalog entry: Yes

Plugin overview

aggregate is used in the Logstash filter stage. Aggregates information across events that belong to the same task.

Typical use cases

  • Correlate multiple related events into task/session-level outputs.
  • Transform fields before indexing to keep schema and naming consistent.

Input and output behavior

  • Flow: Correlates events by task key and emits aggregate state on completion/timeout.
  • Input: works on events that match your surrounding if conditions.
  • Output: updates the current event in place unless configured otherwise.
  • Important options: code, task_id, aggregate_maps_path, end_of_task.

Options

Required

  • code (type: string; default: none) — Ruby code executed on each matching event; the shared state is available as map and the current event as event.
  • task_id (type: string; default: none) — Sprintf expression that produces the shared task key used to correlate related events (for example %{transaction_id}).

Optional

  • aggregate_maps_path (type: string; default: none) — File path used to persist aggregate state across pipeline restarts.
  • end_of_task (type: boolean; default: false) — When true, deletes the map entry for the task after the code block runs (use this on the event that ends the task).
  • inactivity_timeout (type: number; default: none) — Seconds without a new event before a task is flushed (must be lower than timeout).
  • map_action (type: string; default: "create_or_update") — Controls whether the filter creates a new map entry, updates an existing one, or both (create, update, or create_or_update).
  • push_map_as_event_on_timeout (type: boolean; default: false) — Emit the aggregated state as a new event when the task times out.
  • push_previous_map_as_event (type: boolean; default: false) — Emit the previous task's aggregated state when a new task id is seen (requires events to arrive in order).
  • timeout (type: number; default: 1800) — Seconds to retain an idle task before it is considered complete and optionally emitted.
  • timeout_code (type: string; default: none) — Ruby code run against the new event produced by a timeout flush.
  • timeout_tags (type: array; default: []) — Tags added to events produced by timeout flushes to help downstream routing.
  • timeout_task_id_field (type: string; default: none) — Field on the flushed event that stores the originating task id.
  • timeout_timestamp_field (type: string; default: none) — Timestamp field on the flushed event; uses the last seen event time when set.
  • map_count_warning_threshold (type: number; default: 5000) — Logs a warning when the number of in-memory aggregate maps exceeds this threshold.

Example configuration

filter {
  aggregate {
    task_id => "%{transaction_id}"
    code => "map['event_count'] ||= 0; map['event_count'] += 1"
    push_map_as_event_on_timeout => true
    timeout => 120
    inactivity_timeout => 30
    timeout_tags => ["aggregated_transaction"]
  }
}

Common options configuration

All Logstash filter plugins support these shared options:

  • add_field (type: hash; default: {}) — Adds fields when the filter succeeds. Supports dynamic field names and values.
  • add_tag (type: array; default: []) — Adds one or more tags when the filter succeeds.
  • enable_metric (type: boolean; default: true) — Enables or disables metric collection for this plugin instance.
  • id (type: string; default: none) — Sets an explicit plugin instance ID for monitoring and troubleshooting.
  • periodic_flush (type: boolean; default: false) — Calls the filter flush method at regular intervals.
  • remove_field (type: array; default: []) — Removes fields when the filter succeeds. Supports dynamic field names.
  • remove_tag (type: array; default: []) — Removes tags when the filter succeeds.
filter {
  aggregate {
    add_field => { "pipeline_stage" => "parsed" }
    add_tag => ["parsed", "logstash_filter"]
    enable_metric => true
    id => "my_filter_instance"
    periodic_flush => false
    remove_field => ["tmp_field"]
    remove_tag => ["temporary"]
  }
}

Apply in Logit.io

  1. Open your stack in Logit.io and navigate to Logstash Pipelines.
  2. In the filter { ... } section, add a aggregate block.
  3. Save your pipeline changes, then restart the Logstash pipeline if prompted.
  4. Send sample events and verify parsed/enriched fields in OpenSearch Dashboards.

Validation checklist

  • Confirm the aggregate block compiles without syntax errors.
  • Verify expected new/updated fields exist in sample documents.
  • Verify unexpected fields are not removed unless explicitly configured.
  • Confirm tags added on success/failure align with your alerting and routing rules.

Troubleshooting

  • If events are unchanged, verify your filter condition (if ...) matches incoming events.
  • If the pipeline fails to start, validate braces/quotes and retry with a minimal filter block.
  • If throughput drops, reduce expensive operations and test with representative sample volume.

References