Skip to content

[WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786

Draft
funrollloops wants to merge 1 commit into
apache:masterfrom
funrollloops:tws-opt-1
Draft

[WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786
funrollloops wants to merge 1 commit into
apache:masterfrom
funrollloops:tws-opt-1

Conversation

@funrollloops

Copy link
Copy Markdown

What changes were proposed in this pull request?

Make two small optimizations to StatefulProcessorApiClient:

  1. Call PickleSerializer() instead of using the default CPickleSerializer (which is CloudPickleSerializer). We don't need the latter since this path does not deal with code objects.
  2. Micro-optimize state value normalization: add a fast-path for primitives, prefer map to generator comprehensions, and move the numpy import and function definition to the top level so it is done once

Why are the changes needed?

Together these changes improve transform with state on a simple rolling-window style benchmark by ~10%.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests.

Was this patch authored or co-authored using generative AI tooling?

No

@funrollloops funrollloops changed the title Make small optimizations to StatefulProcessorApiClient [SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 25, 2026
@funrollloops funrollloops changed the title [SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient [WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 25, 2026

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 blocking, 0 non-blocking, 1 nit.
The closure-hoisting optimization is good; the serializer swap and the fast-path subclass leak read as unintended behavior changes for a "small optimizations" PR.

Correctness (2)

  • stateful_processor_api_client.py:110: PickleSerializer() drops cloudpickle — CPickleSerializer defaults to CloudPickleSerializer — see inline
  • stateful_processor_api_client.py:45: scalar fast-path returns np.float64/pd.Timestamp unconverted (they subclass float/datetime) — see inline

Nits: 1 minor item (see inline comments).

Verification

Confirmed empirically: issubclass(np.float64, float) and issubclass(pandas.Timestamp, datetime) are both True, while np.int64/np.bool_ are not subclasses of int/bool. So the new fast-path leaks exactly np.float64 and pd.Timestamp (returned as-is, skipping .tolist()/.to_pydatetime()), while np.int64/np.bool_ still correctly reach the np.generic branch.

PR description suggestions

  • Document: why the serializer is changed from CPickleSerializer to PickleSerializer (this is a capability change, not just an optimization) — and whether dropping cloudpickle is intended.
  • Add: a real JIRA id (currently SPARK-?????) and a short note on what is being optimized and how it was measured.

Comment thread python/pyspark/sql/streaming/stateful_processor_api_client.py

def _normalize_state_value(v: Any) -> Any:
# Fast path for common scalar values.
if isinstance(v, (bool, int, float, str, bytes, datetime, NoneType)):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fast-path runs before the np.generic / to_pydatetime branches, so it short-circuits subclasses of the listed base types. np.float64 is a subclass of float and pandas.Timestamp is a subclass of datetime, so both now return unconverted here, where the old code normalized them to Python float/datetime via .tolist()/.to_pydatetime(). (np.int64/np.bool_ are not subclasses of int/bool, so those are unaffected.)

Use exact-type checks so subclasses fall through to the conversion branches:

Suggested change
if isinstance(v, (bool, int, float, str, bytes, datetime, NoneType)):
if type(v) in (bool, int, float, str, bytes, datetime, type(None)):

import socket
from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast
from types import NoneType
from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast, TYPE_CHECKING

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TYPE_CHECKING is imported but not used anywhere in the file — flake8 F401 will fail CI. Drop it (or add the intended if TYPE_CHECKING: block):

Suggested change
from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast, TYPE_CHECKING
from typing import IO, Any, Dict, List, Union, Optional, Tuple, Iterator, cast

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants