Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ jobs:
- macos-latest
python-version:
- '3.11'
- '3.14'
dependencies:
- ''
- '"pydantic<2.12"'
- '"pandas<2" "numpy<2" "xarray<2025.09.0" "dask<2024.7.0"'
- '"pandas<3"'
- '"pandas<4"'
exclude:
# pandas<2 and pydantic<2.12 do not support Python 3.14
- python-version: '3.14'
dependencies: '"pydantic<2.12"'
- python-version: '3.14'
dependencies: '"pandas<2" "numpy<2" "xarray<2025.09.0" "dask<2024.7.0"'

steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
Expand Down
8 changes: 4 additions & 4 deletions ccflow/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ class ArrowFileSystem(ObjectConfig, abc.ABC):
"""


_LOCAL_FILE_SYSTEM = PyObjectPath("pyarrow.fs.LocalFileSystem")
_S3_FILE_SYSTEM = PyObjectPath("pyarrow.fs.S3FileSystem")


class ArrowLocalFileSystem(ArrowFileSystem):
"""Wrapping of pyarrow.fs.LocalFilesystem as a generic ObjectConfig.

See https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html
"""

_LOCAL_FILE_SYSTEM = PyObjectPath("pyarrow.fs.LocalFileSystem")

object_type: Literal[_LOCAL_FILE_SYSTEM] = _LOCAL_FILE_SYSTEM


Expand All @@ -120,8 +122,6 @@ class ArrowS3FileSystem(ArrowFileSystem):

See https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html"""

_S3_FILE_SYSTEM = PyObjectPath("pyarrow.fs.S3FileSystem")

object_type: Literal[_S3_FILE_SYSTEM] = _S3_FILE_SYSTEM


Expand Down
35 changes: 27 additions & 8 deletions ccflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from types import GenericAlias, MappingProxyType
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, get_args, get_origin

import pydantic
from packaging import version

if TYPE_CHECKING:
Expand Down Expand Up @@ -106,19 +107,33 @@ def get_registry_dependencies(self, types: Optional[Tuple["ModelType"]] = None)
return deps


# Pydantic 2 has different handling of serialization.
# This requires some workarounds at the moment until the feature is added to easily get a mode that
# is compatible with Pydantic 1
# This is done by adjusting annotations via a MetaClass for any annotation that includes a BaseModel,
# such that the new annotation contains SerializeAsAny
# Pydantic 2 changed nested model serialization to use the annotated type by default.
# For pydantic versions before runtime polymorphic serialization, preserve ccflow's
# historical duck-typed behavior by adjusting BaseModel annotations via a metaclass.
# https://docs.pydantic.dev/latest/concepts/serialization/#serializing-with-duck-typing
# https://github.com/pydantic/pydantic/issues/6423
# https://github.com/pydantic/pydantic-core/pull/740
# See https://github.com/pydantic/pydantic/issues/6381 for inspiration on implementation
# NOTE: For this logic to be removed, require https://github.com/pydantic/pydantic-core/pull/1478
from pydantic._internal._model_construction import ModelMetaclass # noqa: E402

_IS_PY39 = version.parse(platform.python_version()) < version.parse("3.10")
_PYDANTIC_VERSION = version.parse(pydantic.__version__)
_USE_RUNTIME_POLYMORPHIC_SERIALIZATION = _PYDANTIC_VERSION >= version.parse("2.13")


def _namespace_annotations(namespaces: Dict[str, Any]) -> dict:
if "__annotations__" in namespaces:
return dict(namespaces["__annotations__"])

try:
import annotationlib
except ImportError:
return {}

annotate = annotationlib.get_annotate_from_class_namespace(namespaces)
if annotate is None:
return {}
return dict(annotationlib.call_annotate_function(annotate, annotationlib.Format.FORWARDREF))


def _adjust_annotations(annotation):
Expand Down Expand Up @@ -149,7 +164,7 @@ def _adjust_annotations(annotation):

class _SerializeAsAnyMeta(ModelMetaclass):
def __new__(self, name: str, bases: Tuple[type], namespaces: Dict[str, Any], **kwargs):
annotations: dict = namespaces.get("__annotations__", {})
annotations = _namespace_annotations(namespaces)

for base in bases:
for base_ in base.__mro__:
Expand All @@ -165,7 +180,10 @@ def __new__(self, name: str, bases: Tuple[type], namespaces: Dict[str, Any], **k
return super().__new__(self, name, bases, namespaces, **kwargs)


class BaseModel(PydanticBaseModel, _RegistryMixin, metaclass=_SerializeAsAnyMeta):
_BASE_MODEL_METACLASS = ModelMetaclass if _USE_RUNTIME_POLYMORPHIC_SERIALIZATION else _SerializeAsAnyMeta


class BaseModel(PydanticBaseModel, _RegistryMixin, metaclass=_BASE_MODEL_METACLASS):
"""BaseModel is a base class for all pydantic models within the ccflow framework.

This gives us a way to add functionality to the framework, including
Expand Down Expand Up @@ -221,6 +239,7 @@ def type_(self) -> PyObjectPath:
# where the default behavior is just to drop the mis-named value. This prevents that
extra="forbid",
ser_json_timedelta="float",
**(dict(polymorphic_serialization=True) if _USE_RUNTIME_POLYMORPHIC_SERIALIZATION else {}),
)

def __str__(self):
Expand Down
4 changes: 3 additions & 1 deletion ccflow/tests/enums/test_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ def test_init_no_csp_explicit(self):

importlib.reload(ccflow.enums)

self.assertTrue("csp" in sys.modules)
# csp will only be in sys.modules if it's actually installed
if importlib.util.find_spec("csp") is not None:
self.assertTrue("csp" in sys.modules)
os.environ.pop("CCFLOW_NO_CSP", None)
10 changes: 7 additions & 3 deletions ccflow/tests/enums/test_pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,16 @@ def test_json_schema_no_csp():
if importlib.util.find_spec("csp"):
pytest.skip("Skipping test because csp installed")

assert MyModel.model_json_schema() == {
schema = MyModel.model_json_schema()
# Python 3.14 changed Enum.__doc__ from "An enumeration." to "An enumeration of <ClassName>"
expected_description = schema["properties"]["enum"]["description"]
assert expected_description in ("An enumeration.", "An enumeration of MyEnum")
assert schema == {
"properties": {
"enum": {"description": "An enumeration.", "enum": ["FIELD1", "FIELD2"], "title": "MyEnum", "type": "string"},
"enum": {"description": expected_description, "enum": ["FIELD1", "FIELD2"], "title": "MyEnum", "type": "string"},
"enum_default": {
"default": "FIELD1",
"description": "An enumeration.",
"description": expected_description,
"enum": ["FIELD1", "FIELD2"],
"title": "MyEnum",
"type": "string",
Expand Down
4 changes: 4 additions & 0 deletions ccflow/tests/examples/test_etl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import sys
from tempfile import NamedTemporaryFile
from unittest.mock import patch

import pytest

from ccflow.examples.etl.__main__ import main
from ccflow.examples.etl.explain import explain
from ccflow.examples.etl.models import DBModel, LinksModel, RestModel, SiteContext
Expand Down Expand Up @@ -38,6 +41,7 @@ def test_db_model(self):
result = db()
assert result.value == "Data loaded into database"

@pytest.mark.skipif(sys.version_info >= (3, 14), reason="Hydra shell completion help string incompatible with Python 3.14 argparse")
def test_cli(self):
with patch("ccflow.examples.etl.__main__.cfg_run") as mock_cfg_run:
with patch("sys.argv", ["etl", "+callable=extract", "+context=[]"]):
Expand Down
57 changes: 20 additions & 37 deletions ccflow/tests/test_base_serialize.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import pickle
import platform
import unittest
from typing import Annotated, ClassVar, Dict, List, Optional, Type, Union
from typing import Annotated, Dict, List, Optional

import numpy as np
from packaging import version
from pydantic import BaseModel as PydanticBaseModel, ConfigDict, Field, ValidationError

from ccflow import BaseModel, NDArray
Expand Down Expand Up @@ -214,47 +212,32 @@ class C(PydanticBaseModel):
_ = C(extra_field1=1)

def test_serialize_as_any(self):
# https://docs.pydantic.dev/latest/concepts/serialization/#serializing-with-duck-typing
# https://github.com/pydantic/pydantic/issues/6423
# This test could be removed once there is a different solution to the issue above
from pydantic import SerializeAsAny
from pydantic.types import constr

if version.parse(platform.python_version()) >= version.parse("3.10"):
pipe_union = A | int
else:
pipe_union = Union[A, int]
class ChildA(A):
value: int

class MyNestedModel(BaseModel):
a1: A
a2: Optional[Union[A, int]]
a2: Optional[A]
a3: Dict[str, Optional[List[A]]]
a4: ClassVar[A]
a5: Type[A]
a6: constr(min_length=1)
a7: pipe_union

target = {
"a1": SerializeAsAny[A],
"a2": Optional[Union[SerializeAsAny[A], int]],
"a4": ClassVar[SerializeAsAny[A]],
"a5": Type[A],
"a6": constr(min_length=1), # Uses Annotation
"a7": Union[SerializeAsAny[A], int],
}
target["a3"] = dict[str, Optional[list[SerializeAsAny[A]]]]
annotations = MyNestedModel.__annotations__
self.assertEqual(str(annotations["a1"]), str(target["a1"]))
self.assertEqual(str(annotations["a2"]), str(target["a2"]))
self.assertEqual(str(annotations["a3"]), str(target["a3"]))
self.assertEqual(str(annotations["a4"]), str(target["a4"]))
self.assertEqual(str(annotations["a5"]), str(target["a5"]))
self.assertEqual(str(annotations["a6"]), str(target["a6"]))
self.assertEqual(str(annotations["a7"]), str(target["a7"]))
a4: A | int

model = MyNestedModel(
a1=ChildA(value=1),
a2=ChildA(value=2),
a3={"child": [ChildA(value=3)], "none": None},
a4=ChildA(value=4),
)

serialized = model.model_dump(mode="python")
self.assertEqual(serialized["a1"]["value"], 1)
self.assertEqual(serialized["a2"]["value"], 2)
self.assertEqual(serialized["a3"]["child"][0]["value"], 3)
self.assertEqual(serialized["a4"]["value"], 4)
self.assertEqual(MyNestedModel.model_validate(serialized), model)

def test_pickle_consistency(self):
model = MultiAttributeModel(z=1, y="test", x=3.14, w=True)
serialized = pickle.dumps(model)
serialized = pickle.dumps(model, protocol=4)
# Hard code the pickled form of the model because it shouldn't change from run to run
# (as it would normally in pydantic because of https://github.com/pydantic/pydantic/issues/11603)
# This is generated on Linux/Python 3.11 - might need to have version specific values if it changes.
Expand Down
2 changes: 1 addition & 1 deletion ccflow/tests/test_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_three_element_tuple(self):

with pytest.raises(InterpolationResolutionError) as exc_info:
OmegaConf.to_container(config, resolve=True)
assert "too many values to unpack (expected 2)" in str(exc_info.value)
assert "too many values to unpack" in str(exc_info.value)


class TestTrimNullValuesResolver:
Expand Down
5 changes: 5 additions & 0 deletions ccflow/tests/utils/test_tokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import enum as _enum
import re
import sys
from collections import OrderedDict
from datetime import date, datetime, time, timedelta
from decimal import Decimal
Expand Down Expand Up @@ -1210,6 +1211,10 @@ def test_compile_first_const_not_stripped(self):
c2 = compile("x = 'bar'", "<test>", "exec")
assert tokenize(c1) != tokenize(c2)

@pytest.mark.skipif(
sys.version_info >= (3, 14),
reason="Python 3.14 removes the None docstring slot from co_consts, causing bytecode index differences",
)
def test_function_docstring_still_stripped(self):
# Conversely, _hash_function_bytecode must still ignore docstrings on function bodies.
def with_doc():
Expand Down
39 changes: 34 additions & 5 deletions ccflow/utils/tokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def _sha256_hexdigest(*parts: bytes | str) -> str:
# ContextVar (rather than threading.local) so the visited set auto-isolates across asyncio tasks.
_visited: contextvars.ContextVar[Optional[set]] = contextvars.ContextVar("_ccflow_normalize_visited", default=None)

# Tracks functions currently being hashed to detect circular closure references.
_hashing_functions: contextvars.ContextVar[Optional[set]] = contextvars.ContextVar("_ccflow_hashing_functions", default=None)


def _with_cycle_check(obj: Any, build: Callable[[], Any]) -> Any:
"""Invoke ``build`` with object-identity cycle detection, returning ``("__cycle__", type_name)`` on re-entry."""
Expand Down Expand Up @@ -82,6 +85,14 @@ def _(obj):

Unknown types fall back to a ``cloudpickle``-based digest, raising ``TypeError`` on pickling failure.
"""
# Python 3.14 exposes internal objects (e.g. _abc._abc_data, pydantic._internal.*,
# pydantic_core._pydantic_core.*) in function closures of ABC-derived classes.
# These are not behavior-relevant; produce a stable token keyed by module + qualname
# so they don't affect hashing.
obj_module = getattr(type(obj), "__module__", "") or ""
if obj_module.startswith("_") or "._" in obj_module:
return ("__internal__", obj_module, type(obj).__qualname__)

try:
import cloudpickle
except ImportError as exc: # pragma: no cover - defensive
Expand Down Expand Up @@ -245,7 +256,21 @@ def _normalize_code(obj):

@normalize_token.register(type(lambda: None)) # FunctionType
def _normalize_function(obj):
return ("__function__", _hash_function_bytecode(obj))
seen = _hashing_functions.get()
created = seen is None
if created:
seen = set()
token = _hashing_functions.set(seen)
elif id(obj) in seen:
return ("__function_cycle__", getattr(obj, "__qualname__", "?"))
seen.add(id(obj))
try:
result = ("__function__", _hash_function_bytecode(obj))
finally:
seen.discard(id(obj))
if created:
_hashing_functions.reset(token)
return result


@normalize_token.register(MethodType)
Expand Down Expand Up @@ -452,9 +477,13 @@ def _hash_function_bytecode(func: Callable) -> Optional[str]:
return None
code = unwrapped.__code__
consts = code.co_consts
# Function code starts with the docstring slot (a str when present, None when absent). Strip it
# so adding/removing a docstring doesn't change the behavior token.
if consts and isinstance(consts[0], (str, type(None))):
# Strip the docstring constant if present. In Python < 3.14, a None sentinel occupies
# co_consts[0] when there is no docstring; in Python >= 3.14 that slot is absent.
# Only strip when the first constant actually matches the function's __doc__.
doc = getattr(unwrapped, "__doc__", None)
if doc is not None and consts and consts[0] == doc:
consts = consts[1:]
elif consts and consts[0] is None:
consts = consts[1:]
code_canonical = (
"code",
Expand Down Expand Up @@ -511,7 +540,7 @@ def _collect_methods(cls: type) -> List[Tuple[str, Callable]]:
seen_names = set()
methods = []
for klass in cls.__mro__:
if klass is object:
if klass is object or klass is _PydanticBaseModel:
break
for name, value in klass.__dict__.items():
if name in seen_names:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ develop = [
# Full deps
"bleach",
"cexprtk",
"csp>=0.8.0,<1",
"csp>=0.8.0,<1; python_version < '3.14'", # TODO: remove when when 3.14 wheels disted
"duckdb",
"IPython",
"panel",
Expand Down
Loading