Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [1.0.29] - 2026-03-20

Added D4DConverter class for converting ROCrateV1_2 into LinkML D4D yaml.
Added dependency PyYaml

Class for ROCrates now has default context set by default. Default context contained inside `fairscape_models.fairscape_base.DEFAULT_CONTEXT` is default for `fairscape_models.rocrate.ROCrateV1_2` property `context`.

Added a bound method `generateFileElem` to `fairscape_models.rocrate.ROCrateMetadataElem` to generate the `fairscape_models.rocrate.ROCrateFileElem` required in an ROCrate.
Expand Down
194 changes: 194 additions & 0 deletions fairscape_models/conversion/d4d_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from fairscape_models.rocrate import ROCrateV1_2
from fairscape_models.conversion.mapping.d4d import ROCRATE_TO_D4D_MAPPING
from typing import Optional, Dict, Any
import pathlib
import yaml


class D4DConverter():
def __init__(
self,
crate: ROCrateV1_2,
outputPath: Optional[pathlib.Path] = None
):
self.crate = crate
self.outputPath = outputPath
self.d4dOutput = None


def find_root_entity(
self,
) -> Optional[dict]:
"""Find the root dataset entity in the RO-Crate."""
rocrate_data = self.crate.model_dump(
by_alias=True
)
graph = rocrate_data.get("@graph", [])

# Find the metadata descriptor to get root ID
metadata_descriptor = None
for entity in graph:
if entity.get("@id") == "ro-crate-metadata.json":
metadata_descriptor = entity
break

if not metadata_descriptor:
return None

# Get the root ID from about field
about = metadata_descriptor.get("about", {})
root_id = about.get("@id") if isinstance(about, dict) else about

if not root_id:
return None

# Find and return the root entity
for entity in graph:
if entity.get("@id") == root_id:
return entity

return None


def apply_mapping(
self,
source_dict: Dict[str, Any],
mapping: Dict[str, Any]
) -> Dict[str, Any]:
"""Apply the mapping configuration to convert ROCrate data to D4D format."""
result = {}

for target_key, spec in mapping.items():
value = None

if "fixed_value" in spec:
value = spec["fixed_value"]
elif "source_key" in spec:
value = source_dict.get(spec["source_key"])
if "parser" in spec and value is not None:
value = spec["parser"](value)
elif "builder_func" in spec:
value = spec["builder_func"](source_dict)

if value is not None:
result[target_key] = value

return result


def convert(self):
"""Convert RO-Crate dictionary to D4D format."""
# Apply the mapping
root_entity = self.find_root_entity()

d4d_flat = self.apply_mapping(root_entity, ROCRATE_TO_D4D_MAPPING)

# Convert to proper D4D structure
d4d_data = convert_to_d4d_structure(d4d_flat)

self.d4dOutput = d4d_data


def dump(self):
""" Write out converted D4D to a file named ro-crate-linkml.yaml in the output Path Directory"""
if self.d4dOutput:
if self.outputPath.name == "ro-crate-linkml.yaml":
pass
elif self.outputPath.is_dir():
# if a folder
self.outputPath = self.outputPath / "ro-crate-linkml.yaml"
else:
self.outputPath = self.outputPath.parent / "ro-crate-linkml.yaml"

with self.outputPath.open("w") as outputFile:
yaml.dump(self.d4dOutput, outputFile)
else:
pass


def convert_to_d4d_structure(d4d_flat: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert flat D4D mapping output to proper D4D YAML structure.

The D4D schema uses nested structures for many properties,
so this function reorganizes the flat output accordingly.
"""
result = {}

# Core metadata fields (direct copy)
direct_fields = [
'id', 'name', 'title', 'description', 'page', 'language', 'version',
'license', 'doi', 'download_url', 'publisher', 'citation',
'bytes', 'encoding', 'format', 'hash', 'md5', 'sha256',
'compression', 'conforms_to', 'created_by', 'created_on',
'last_updated_on', 'was_derived_from'
]

for field in direct_fields:
if field in d4d_flat and d4d_flat[field] is not None:
result[field] = d4d_flat[field]

# Keywords (should be a list)
if 'keywords' in d4d_flat:
keywords = d4d_flat['keywords']
if isinstance(keywords, str):
result['keywords'] = [k.strip() for k in keywords.split(',') if k.strip()]
elif isinstance(keywords, list):
result['keywords'] = keywords

# Properties that should be lists of objects with description
list_description_fields = {
'purposes': 'purposes',
'tasks': 'tasks',
'known_biases': 'known_biases',
'known_limitations': 'known_limitations',
'sensitive_elements': 'sensitive_elements',
'collection_mechanisms': 'collection_mechanisms',
'collection_timeframes': 'collection_timeframes',
'missing_data_documentation': 'missing_data_documentation',
'raw_data_sources': 'raw_data_sources',
'ethical_reviews': 'ethical_reviews',
'human_subject_research': 'human_subject_research',
'preprocessing_strategies': 'preprocessing_strategies',
'labeling_strategies': 'labeling_strategies',
'raw_sources': 'raw_sources',
'imputation_protocols': 'imputation_protocols',
'annotation_analyses': 'annotation_analyses',
'machine_annotation_tools': 'machine_annotation_tools',
'future_use_impacts': 'future_use_impacts',
'discouraged_uses': 'discouraged_uses',
'intended_uses': 'intended_uses',
'prohibited_uses': 'prohibited_uses',
'distribution_formats': 'distribution_formats',
'creators': 'creators',
'funders': 'funders',
}

for source_key, target_key in list_description_fields.items():
if source_key in d4d_flat and d4d_flat[source_key] is not None:
value = d4d_flat[source_key]
if isinstance(value, list):
# Already a list, convert each item
result[target_key] = [
{'description': item} if isinstance(item, str) else item
for item in value
]
elif isinstance(value, str):
# Single string value - create list with one item
result[target_key] = [{'description': value}]
else:
result[target_key] = value

return result


def dict_representer(dumper, data):
"""Custom representer for ordered dictionary output."""
return dumper.represent_mapping('tag:yaml.org,2002:map', data.items())


def str_representer(dumper, data):
"""Use literal block style for multiline strings."""
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "fairscape-models"
version = "1.0.23"
version = "1.0.29"
description = "Fairscape pydantic models"
readme = "README.md"
authors = [
Expand All @@ -20,7 +20,8 @@ dependencies = [
"pydantic",
"pymongo",
"typing",
"mongomock"
"mongomock",
"pyyaml>=6.0.3",
]
requires-python = ">=3.8"

Expand Down
43 changes: 43 additions & 0 deletions tests/test_d4d_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from fairscape_models.rocrate import ROCrateV1_2
from fairscape_models.conversion.d4d_converter import D4DConverter
import pytest
import pathlib


# Define the path to the Test-ROcrates directory
TEST_ROCRATES_PATH = pathlib.Path(__file__).parent / "test_rocrates"

def find_rocrate_metadata_files(base_path: pathlib.Path):
"""Recursively finds all ro-crate-metadata.json files."""
if not base_path.is_dir():
return []
return list(base_path.rglob("ro-crate-metadata.json"))


# Create a list of test cases for parametrization
test_files = find_rocrate_metadata_files(TEST_ROCRATES_PATH)
test_ids = [str(p.relative_to(TEST_ROCRATES_PATH)) for p in test_files]


@pytest.mark.parametrize("rocrate_file_path", test_files, ids=test_ids)
def test_validate_test_rocrates(rocrate_file_path: pathlib.Path):
"""Parametrized test to validate all ro-crate-metadata.json files."""
print(f"\n--> Validating Test-ROCrate: {rocrate_file_path.relative_to(TEST_ROCRATES_PATH)}")

with open(rocrate_file_path, 'r', encoding='utf-8') as f:
rocrate_json_data = f.read()

rocrate_instance = ROCrateV1_2.model_validate_json(rocrate_json_data)

assert rocrate_instance is not None
assert isinstance(rocrate_instance, ROCrateV1_2)

# convert d4d test
genD4D = D4DConverter(rocrate_instance)

genD4D.convert()

assert genD4D.d4dOutput is not None
assert isinstance(genD4D.d4dOutput, dict)


Loading
Loading