Aggregate filter plugin
Groups information across events that share a common task identifier so you can produce correlated outputs (such as transaction durations, per-session summaries, or paired request/response records). State is held in memory per pipeline worker, which means you should run aggregating pipelines on a single worker.
- Package:
logstash-filter-aggregate - Coverage source: default/bundled
- Official catalog entry: Yes
Plugin overview
aggregate is used in the Logstash filter stage. Aggregates information across events that belong to the same task.
Typical use cases
- Correlate multiple related events into task/session-level outputs.
- Transform fields before indexing to keep schema and naming consistent.
Input and output behavior
- Flow: Correlates events by task key and emits aggregate state on completion/timeout.
- Input: works on events that match your surrounding
ifconditions. - Output: updates the current event in place unless configured otherwise.
- Important options:
code,task_id,aggregate_maps_path,end_of_task.
Options
Required
code(type: string; default: none) — Ruby code executed on each matching event; the shared state is available asmapand the current event asevent.task_id(type: string; default: none) — Sprintf expression that produces the shared task key used to correlate related events (for example%{transaction_id}).
Optional
aggregate_maps_path(type: string; default: none) — File path used to persist aggregate state across pipeline restarts.end_of_task(type: boolean; default:false) — When true, deletes the map entry for the task after the code block runs (use this on the event that ends the task).inactivity_timeout(type: number; default: none) — Seconds without a new event before a task is flushed (must be lower thantimeout).map_action(type: string; default:"create_or_update") — Controls whether the filter creates a new map entry, updates an existing one, or both (create,update, orcreate_or_update).push_map_as_event_on_timeout(type: boolean; default:false) — Emit the aggregated state as a new event when the task times out.push_previous_map_as_event(type: boolean; default:false) — Emit the previous task's aggregated state when a new task id is seen (requires events to arrive in order).timeout(type: number; default:1800) — Seconds to retain an idle task before it is considered complete and optionally emitted.timeout_code(type: string; default: none) — Ruby code run against the new event produced by a timeout flush.timeout_tags(type: array; default:[]) — Tags added to events produced by timeout flushes to help downstream routing.timeout_task_id_field(type: string; default: none) — Field on the flushed event that stores the originating task id.timeout_timestamp_field(type: string; default: none) — Timestamp field on the flushed event; uses the last seen event time when set.map_count_warning_threshold(type: number; default:5000) — Logs a warning when the number of in-memory aggregate maps exceeds this threshold.
Example configuration
filter {
aggregate {
task_id => "%{transaction_id}"
code => "map['event_count'] ||= 0; map['event_count'] += 1"
push_map_as_event_on_timeout => true
timeout => 120
inactivity_timeout => 30
timeout_tags => ["aggregated_transaction"]
}
}Common options configuration
All Logstash filter plugins support these shared options:
add_field(type: hash; default:{}) — Adds fields when the filter succeeds. Supports dynamic field names and values.add_tag(type: array; default:[]) — Adds one or more tags when the filter succeeds.enable_metric(type: boolean; default:true) — Enables or disables metric collection for this plugin instance.id(type: string; default:none) — Sets an explicit plugin instance ID for monitoring and troubleshooting.periodic_flush(type: boolean; default:false) — Calls the filter flush method at regular intervals.remove_field(type: array; default:[]) — Removes fields when the filter succeeds. Supports dynamic field names.remove_tag(type: array; default:[]) — Removes tags when the filter succeeds.
filter {
aggregate {
add_field => { "pipeline_stage" => "parsed" }
add_tag => ["parsed", "logstash_filter"]
enable_metric => true
id => "my_filter_instance"
periodic_flush => false
remove_field => ["tmp_field"]
remove_tag => ["temporary"]
}
}Apply in Logit.io
- Open your stack in Logit.io and navigate to Logstash Pipelines.
- In the
filter { ... }section, add aaggregateblock. - Save your pipeline changes, then restart the Logstash pipeline if prompted.
- Send sample events and verify parsed/enriched fields in OpenSearch Dashboards.
Validation checklist
- Confirm the
aggregateblock compiles without syntax errors. - Verify expected new/updated fields exist in sample documents.
- Verify unexpected fields are not removed unless explicitly configured.
- Confirm tags added on success/failure align with your alerting and routing rules.
Troubleshooting
- If events are unchanged, verify your filter condition (
if ...) matches incoming events. - If the pipeline fails to start, validate braces/quotes and retry with a minimal filter block.
- If throughput drops, reduce expensive operations and test with representative sample volume.
References
- GitHub package:
logstash-filter-aggregate(opens in a new tab) - Canonical catalog: /log-management/ingestion-pipeline/logstash-filters-reference