[WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786
[WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786funrollloops wants to merge 1 commit into
Conversation
HyukjinKwon
left a comment
There was a problem hiding this comment.
2 blocking, 0 non-blocking, 1 nit.
The closure-hoisting optimization is good; the serializer swap and the fast-path subclass leak read as unintended behavior changes for a "small optimizations" PR.
Correctness (2)
- stateful_processor_api_client.py:110:
PickleSerializer()drops cloudpickle —CPickleSerializerdefaults toCloudPickleSerializer— see inline - stateful_processor_api_client.py:45: scalar fast-path returns
np.float64/pd.Timestampunconverted (they subclassfloat/datetime) — see inline
Nits: 1 minor item (see inline comments).
Verification
Confirmed empirically: issubclass(np.float64, float) and issubclass(pandas.Timestamp, datetime) are both True, while np.int64/np.bool_ are not subclasses of int/bool. So the new fast-path leaks exactly np.float64 and pd.Timestamp (returned as-is, skipping .tolist()/.to_pydatetime()), while np.int64/np.bool_ still correctly reach the np.generic branch.
PR description suggestions
- Document: why the serializer is changed from
CPickleSerializertoPickleSerializer(this is a capability change, not just an optimization) — and whether dropping cloudpickle is intended. - Add: a real JIRA id (currently
SPARK-?????) and a short note on what is being optimized and how it was measured.
|
|
||
| def _normalize_state_value(v: Any) -> Any: | ||
| # Fast path for common scalar values. | ||
| if isinstance(v, (bool, int, float, str, bytes, datetime, NoneType)): |
There was a problem hiding this comment.
This fast-path runs before the np.generic / to_pydatetime branches, so it short-circuits subclasses of the listed base types. np.float64 is a subclass of float and pandas.Timestamp is a subclass of datetime, so both now return unconverted here, where the old code normalized them to Python float/datetime via .tolist()/.to_pydatetime(). (np.int64/np.bool_ are not subclasses of int/bool, so those are unaffected.)
Use exact-type checks so subclasses fall through to the conversion branches:
| if isinstance(v, (bool, int, float, str, bytes, datetime, NoneType)): | |
| if type(v) in (bool, int, float, str, bytes, datetime, type(None)): |
| import socket | ||
| from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast | ||
| from types import NoneType | ||
| from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast, TYPE_CHECKING |
There was a problem hiding this comment.
TYPE_CHECKING is imported but not used anywhere in the file — flake8 F401 will fail CI. Drop it (or add the intended if TYPE_CHECKING: block):
| from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast, TYPE_CHECKING | |
| from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast |
What changes were proposed in this pull request?
Make two small optimizations to StatefulProcessorApiClient:
mapto generator comprehensions, and move the numpy import and function definition to the top level so it is done onceWhy are the changes needed?
Together these changes improve transform with state on a simple rolling-window style benchmark by ~10%.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit tests.
Was this patch authored or co-authored using generative AI tooling?
No