Metrics (core.metrics)

Registry

Meter registry protocol and default implementation.

Provides a pluggable abstraction for recording counters, gauges, and timers. Implementations can export metrics to Prometheus, OpenTelemetry, or any other backend.

The InMemoryRegistry stores all metrics in memory and is suitable for testing or lightweight use.

class pyspark_pipeline_framework.core.metrics.registry.MeterRegistry(*args, **kwargs)[source]

Bases: Protocol

Protocol for recording application metrics.

Implementations receive metric data from pipeline hooks and can export it to any observability backend.

All methods must be safe to call from multiple threads.

counter(name, value=1.0, tags=None)[source]

Increment a counter metric.

Parameters:
  • name (str) – Metric name (e.g. "ppf.component.retries").

  • value (float) – Amount to increment by.

  • tags (dict[str, str] | None) – Optional key-value tags for dimensionality.

Return type:

None

gauge(name, value, tags=None)[source]

Set a gauge metric to an absolute value.

Parameters:
  • name (str) – Metric name (e.g. "ppf.pipeline.active_components").

  • value (float) – Current value.

  • tags (dict[str, str] | None) – Optional key-value tags for dimensionality.

Return type:

None

timer(name, duration_ms, tags=None)[source]

Record a timing measurement.

Parameters:
  • name (str) – Metric name (e.g. "ppf.component.duration").

  • duration_ms (float) – Duration in milliseconds.

  • tags (dict[str, str] | None) – Optional key-value tags for dimensionality.

Return type:

None

get_metrics()[source]

Return a snapshot of all recorded metrics.

Returns:

A dictionary keyed by metric name. The value structure is implementation-defined.

Return type:

dict[str, Any]

class pyspark_pipeline_framework.core.metrics.registry.InMemoryRegistry[source]

Bases: object

Thread-safe in-memory metrics registry.

Stores counters, gauges, and timer totals/counts in memory. Useful for testing, local debugging, and as the default registry when no external backend is configured.

counter(name, value=1.0, tags=None)[source]
Parameters:
Return type:

None

gauge(name, value, tags=None)[source]
Parameters:
Return type:

None

timer(name, duration_ms, tags=None)[source]
Parameters:
Return type:

None

get_metrics()[source]

Return a snapshot of all recorded metrics.

Returns:

Dictionary with keys "counters", "gauges", "timers". Each contains a dict keyed by metric name with values representing the aggregated data.

Return type:

dict[str, Any]

get_counter(name, tags=None)[source]

Get current counter value for convenience.

Parameters:
  • name (str) – Counter name.

  • tags (dict[str, str] | None) – Optional tags to match.

Returns:

Current counter value, or 0.0 if not found.

Return type:

float

get_gauge(name, tags=None)[source]

Get current gauge value for convenience.

Parameters:
  • name (str) – Gauge name.

  • tags (dict[str, str] | None) – Optional tags to match.

Returns:

Current gauge value, or None if not set.

Return type:

float | None

get_timer_total(name, tags=None)[source]

Get total timer duration for convenience.

Parameters:
  • name (str) – Timer name.

  • tags (dict[str, str] | None) – Optional tags to match.

Returns:

Total recorded duration in ms, or 0.0 if not found.

Return type:

float

get_timer_count(name, tags=None)[source]

Get number of timer recordings for convenience.

Parameters:
  • name (str) – Timer name.

  • tags (dict[str, str] | None) – Optional tags to match.

Returns:

Number of recordings, or 0 if not found.

Return type:

int

reset()[source]

Clear all recorded metrics.

Return type:

None