Source code for pyspark_pipeline_framework.core.config.spark

"""Spark configuration models."""

from dataclasses import dataclass, field

from pyspark_pipeline_framework.core.config.base import SparkDeployMode


[docs] @dataclass class SparkConfig: """Configuration for Apache Spark runtime.""" app_name: str """Spark application name (required)""" master: str = "local[*]" """Spark master URL - local, yarn, k8s URL (default: local[*])""" deploy_mode: SparkDeployMode = SparkDeployMode.CLIENT """Spark deployment mode (default: client)""" driver_memory: str = "2g" """Driver memory allocation (default: 2g)""" driver_cores: int = 1 """Number of cores for driver (default: 1)""" executor_memory: str = "4g" """Executor memory allocation (default: 4g)""" executor_cores: int = 2 """Number of cores per executor (default: 2)""" num_executors: int = 2 """Number of executors (default: 2)""" dynamic_allocation: bool = False """Enable dynamic allocation of executors (default: False)""" spark_conf: dict[str, str] = field(default_factory=dict) """Additional Spark configuration properties (default: {})""" connect_string: str | None = None """Spark Connect remote URL (e.g., 'sc://localhost:15002') for Spark 3.4+""" def __post_init__(self) -> None: """Validate configuration after initialization.""" if not self.app_name: raise ValueError("app_name is required") if self.driver_cores < 1: raise ValueError("driver_cores must be at least 1") if self.executor_cores < 1: raise ValueError("executor_cores must be at least 1") if self.num_executors < 1 and not self.dynamic_allocation: raise ValueError("num_executors must be at least 1 when dynamic_allocation is False")
[docs] def to_spark_conf_dict(self) -> dict[str, str]: """Convert to Spark configuration dictionary. Returns: Dictionary of Spark configuration properties. """ config = { "spark.app.name": self.app_name, "spark.master": self.master, "spark.submit.deployMode": self.deploy_mode.value, "spark.driver.memory": self.driver_memory, "spark.driver.cores": str(self.driver_cores), "spark.executor.memory": self.executor_memory, "spark.executor.cores": str(self.executor_cores), } if not self.dynamic_allocation: config["spark.executor.instances"] = str(self.num_executors) else: config["spark.dynamicAllocation.enabled"] = "true" # Merge additional spark_conf config.update(self.spark_conf) return config