diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index cf0a1d522118f..ed62d85dc29a3 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -23,6 +23,7 @@ from typing import ( List, Optional, Tuple, + TypedDict, TypeVar, Union, ) @@ -85,4 +86,8 @@ class UserDefinedFunctionLike(Protocol): def __call__(self, *args: ColumnOrName) -> Column: ... def asNondeterministic(self) -> UserDefinedFunctionLike: ... -ProfileResults = Dict[Union[int, str], Tuple[Optional[pstats.Stats], Optional[CodeMapDict]]] +class ProfileResult(TypedDict, total=False): + perf: pstats.Stats + memory: CodeMapDict + +ProfileResults = Dict[Union[int, str], ProfileResult] diff --git a/python/pyspark/sql/connect/profiler.py b/python/pyspark/sql/connect/profiler.py index b8825cf5678eb..681287c40afc0 100644 --- a/python/pyspark/sql/connect/profiler.py +++ b/python/pyspark/sql/connect/profiler.py @@ -29,7 +29,7 @@ class ConnectProfilerCollector(ProfilerCollector): def __init__(self) -> None: super().__init__() - self._value = ProfileResultsParam.zero(None) + self._value = ProfileResultsParam.zero({}) @property def _profile_results(self) -> "ProfileResults": diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 88f48070f2058..0b0c038c8f72e 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -54,35 +54,32 @@ from pyspark.sql._typing import ProfileResults -class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]): +class _ProfileResultsParam(AccumulatorParam["ProfileResults"]): """ AccumulatorParam for profilers. """ @staticmethod - def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]: - return value + def zero(value: "ProfileResults") -> "ProfileResults": + return {} @staticmethod - def addInPlace( - value1: Optional["ProfileResults"], value2: Optional["ProfileResults"] - ) -> Optional["ProfileResults"]: - if value1 is None or len(value1) == 0: - value1 = {} - if value2 is None or len(value2) == 0: - value2 = {} - - value = value1.copy() - for key, (perf, mem, *_) in value2.items(): - if key in value1: - orig_perf, orig_mem, *_ = value1[key] + def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileResults": + for key, result in value2.items(): + if key not in value1: + value1[key] = result else: - orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None)) - value[key] = ( - PStatsParam.addInPlace(orig_perf, perf), - MemUsageParam.addInPlace(orig_mem, mem), - ) - return value + perf = PStatsParam.addInPlace( + value1[key].get("perf", None), result.get("perf", None) + ) + if perf is not None: + value1[key]["perf"] = perf + memory = MemUsageParam.addInPlace( + value1[key].get("memory", None), result.get("memory", None) + ) + if memory is not None: + value1[key]["memory"] = memory + return value1 ProfileResultsParam = _ProfileResultsParam() @@ -94,7 +91,7 @@ class WorkerPerfProfiler: """ def __init__( - self, accumulator: Accumulator[Optional["ProfileResults"]], result_key: Union[int, str] + self, accumulator: Accumulator["ProfileResults"], result_key: Union[int, str] ) -> None: self._accumulator = accumulator self._profiler = cProfile.Profile() @@ -111,7 +108,7 @@ def save(self) -> None: # make it picklable st.stream = None # type: ignore[attr-defined] st.strip_dirs() - self._accumulator.add({self._result_key: (st, None)}) + self._accumulator.add({self._result_key: {"perf": st}}) def __enter__(self) -> "WorkerPerfProfiler": self.start() @@ -134,7 +131,7 @@ class WorkerMemoryProfiler: def __init__( self, - accumulator: Accumulator[Optional["ProfileResults"]], + accumulator: Accumulator["ProfileResults"], result_key: Union[int, str], func_or_code: Union[Callable, CodeType], ) -> None: @@ -159,7 +156,7 @@ def save(self) -> None: filename: list(line_iterator) for filename, line_iterator in self._profiler.code_map.items() } - self._accumulator.add({self._result_key: (None, codemap_dict)}) + self._accumulator.add({self._result_key: {"memory": codemap_dict}}) def __enter__(self) -> "WorkerMemoryProfiler": self.start() @@ -226,9 +223,9 @@ def show(id: Union[int, str]) -> None: def _perf_profile_results(self) -> Dict[Union[int, str], pstats.Stats]: with self._lock: return { - result_id: perf - for result_id, (perf, _, *_) in self._profile_results.items() - if perf is not None + result_id: result["perf"] + for result_id, result in self._profile_results.items() + if result.get("perf", None) is not None } def show_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: @@ -272,9 +269,9 @@ def show(id: Union[int, str]) -> None: def _memory_profile_results(self) -> Dict[Union[int, str], CodeMapDict]: with self._lock: return { - result_id: mem - for result_id, (_, mem, *_) in self._profile_results.items() - if mem is not None + result_id: result["memory"] + for result_id, result in self._profile_results.items() + if result.get("memory", None) is not None } @property @@ -368,15 +365,14 @@ def clear_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *_ = self._profile_results[id] - self._profile_results[id] = (None, mem, *_) - if mem is None: - self._profile_results.pop(id, None) + self._profile_results[id].pop("perf", None) + if not self._profile_results[id]: + self._profile_results.pop(id) else: - for id, (perf, mem, *_) in list(self._profile_results.items()): - self._profile_results[id] = (None, mem, *_) - if mem is None: - self._profile_results.pop(id, None) + for id in list(self._profile_results.keys()): + self._profile_results[id].pop("perf", None) + if not self._profile_results[id]: + self._profile_results.pop(id) def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: """ @@ -393,15 +389,14 @@ def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *_ = self._profile_results[id] - self._profile_results[id] = (perf, None, *_) - if perf is None: - self._profile_results.pop(id, None) + self._profile_results[id].pop("memory", None) + if not self._profile_results[id]: + self._profile_results.pop(id) else: - for id, (perf, mem, *_) in list(self._profile_results.items()): - self._profile_results[id] = (perf, None, *_) - if perf is None: - self._profile_results.pop(id, None) + for id in list(self._profile_results.keys()): + self._profile_results[id].pop("memory", None) + if not self._profile_results[id]: + self._profile_results.pop(id) class AccumulatorProfilerCollector(ProfilerCollector): @@ -411,7 +406,7 @@ def __init__(self) -> None: self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER] else: self._accumulator = Accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam ) @property diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 759aa9c683639..2900860d43d2c 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -752,7 +752,7 @@ def test_perf_profiler_clear(self): class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: super().setUp() - self.spark._profiler_collector._accumulator._value = None + self.spark._profiler_collector._accumulator._value = {} if __name__ == "__main__": diff --git a/python/pyspark/sql/worker/utils.py b/python/pyspark/sql/worker/utils.py index 406894fc275a6..58f2dbb67f648 100644 --- a/python/pyspark/sql/worker/utils.py +++ b/python/pyspark/sql/worker/utils.py @@ -67,7 +67,7 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None: _accumulatorRegistry.clear() accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam ) if main.__module__ == "__main__": diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index 8664688c3b8e3..5d9e41f16f06e 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -612,7 +612,7 @@ def test_profilers_clear(self): class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: super().setUp() - self.spark._profiler_collector._accumulator._value = None + self.spark._profiler_collector._accumulator._value = {} if __name__ == "__main__": diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4e1de29426bf3..4084b1c8f9cfc 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1069,7 +1069,7 @@ def wrap_perf_profiler(f, eval_type, result_id): from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam ) if _is_iter_based(eval_type): @@ -1103,7 +1103,7 @@ def wrap_memory_profiler(f, eval_type, result_id): return f accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam ) if _is_iter_based(eval_type):