diff --git a/README.md b/README.md index 6553460..548f8f8 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,22 @@ directory = "/path/to/holistic/directory" pose = load_MediaPipe_directory(directory, fps=24, width=1000, height=1000) ``` +#### 7. Conversion from JSON poses to `.pose` format + +The library supports converting pose estimation outputs stored as `.json` files into the `.pose` format via `json_to_pose` utility. + +> **Note** +> - At the moment, `json_to_pose` only supports [AlphaPose](https://github.com/MVIG-SJTU/AlphaPose) models with **133 keypoints** JSON files with 133 keypoints. +> - Metadata such as FPS, width, and height can be automatically extracted from the original RGB video if provided. + +**Example usage:** + +```bash +json_to_pose -i alphapose.json -o alphapose.pose --format alphapose +json_to_pose -i alphapose.json -o alphapose.pose --original-video video.mp4 --format alphapose +``` + + ### Running Tests: To ensure the integrity of the toolkit, you can run tests using Bazel: diff --git a/src/python/pose_format/bin/json_to_pose.py b/src/python/pose_format/bin/json_to_pose.py new file mode 100644 index 0000000..081bfdd --- /dev/null +++ b/src/python/pose_format/bin/json_to_pose.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +import argparse +import os + +from simple_video_utils.metadata import video_metadata +from simple_video_utils.frames import read_frames_exact +from pose_format.utils.alphapose import load_alphapose_wholebody_from_json +from typing import Optional + +def json_to_pose( + input_path: str, + output_path: str, + original_video_path: Optional[str], + format: str): + """ + Render pose visualization over a video. + + Parameters + ---------- + input_path : str + Path to the input .json file. + output_path : str + Path where the output .pose file will be saved. + original_video_path : str or None, optional + Path to the original RGB video to obtain metadata. + If None, it first check if the .json file already contains the metadata, otherwise use the default values. + """ + + kwargs = {} + if original_video_path is not None: + # Load video metadata + print('Obtaining metadata from video ...') + metadata = video_metadata(original_video_path) + kwargs["fps"] = metadata.fps + kwargs["width"] = metadata.width + kwargs["height"] = metadata.height + + # Perform pose estimation + print('Converting .json to .pose pose-format ...') + if format == 'alphapose': + pose = load_alphapose_wholebody_from_json( + input_path=input_path, + **kwargs # only includes keys if video metadata was found + ) + else: + raise NotImplementedError(f'Pose format {format} not supported') + + # Write + print('Saving to disk ...') + with open(output_path, "wb") as f: + pose.write(f) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('-i', required=True, type=str, help='Path to the input .json file.') + parser.add_argument('-o', required=True, type=str, help='Path where the output .pose file will be saved.') + parser.add_argument( + '--original-video', + type=str, + default=None, + help=( + "Path to the original RGB video used for metadata extraction. " + "If None, metadata is taken from the JSON file if available, " + "otherwise default width/height/FPS values are used." + ) + ) + parser.add_argument('--format', + choices=['alphapose'], + default='alphapose', + type=str, + help='orignal type of the .json pose estimation') + args = parser.parse_args() + + if not os.path.exists(args.i): + raise FileNotFoundError(f"Video file {args.i} not found") + + json_to_pose(args.i, args.o, args.original_video, args.format) + + # pip install . && json_to_pose -i alphapose.json -o alphapose.pose --format alphapose + # pip install . && json_to_pose -i alphapose.json -o alphapose.pose --original-video video.mp4 --format alphapose \ No newline at end of file diff --git a/src/python/pose_format/utils/alphapose.py b/src/python/pose_format/utils/alphapose.py new file mode 100644 index 0000000..5798a63 --- /dev/null +++ b/src/python/pose_format/utils/alphapose.py @@ -0,0 +1,301 @@ +import re +import json +import numpy as np +from ..numpy.pose_body import NumPyPoseBody +from ..pose import Pose +from ..pose_header import PoseHeader, PoseHeaderComponent, PoseHeaderDimensions +from pose_format.utils.openpose import hand_colors + +BODY_POINTS = [ + "nose","left_eye","right_eye","left_ear","right_ear", + "left_shoulder","right_shoulder","left_elbow","right_elbow", + "left_wrist","right_wrist","left_hip","right_hip", + "left_knee","right_knee","left_ankle","right_ankle", + "left_big_toe","left_small_toe","left_heel", + "right_big_toe","right_small_toe","right_heel", +] + +BODY_LIMBS_NAMES = [ + ("left_ankle", "left_knee"), + ("left_knee", "left_hip"), + ("right_ankle", "right_knee"), + ("right_knee", "right_hip"), + ("left_hip", "right_hip"), + ("left_shoulder", "left_hip"), + ("right_shoulder", "right_hip"), + ("left_shoulder", "right_shoulder"), + ("left_shoulder", "left_elbow"), + ("right_shoulder", "right_elbow"), + ("left_elbow", "left_wrist"), + ("right_elbow", "right_wrist"), + ("left_eye", "right_eye"), + ("nose", "left_eye"), + ("nose", "right_eye"), + ("left_eye", "left_ear"), + ("right_eye", "right_ear"), + ("left_ear", "left_shoulder"), + ("right_ear", "right_shoulder"), + ("left_ankle", "left_big_toe"), + ("left_ankle", "left_small_toe"), + ("left_ankle", "left_heel"), + ("right_ankle", "right_big_toe"), + ("right_ankle", "right_small_toe"), + ("right_ankle", "right_heel"), +] + +LEFT_HAND_LIMBS_NAMES = [ + ("left_hand_0", "left_hand_1"), ("left_hand_1", "left_hand_2"), + ("left_hand_2", "left_hand_3"), ("left_hand_3", "left_hand_4"), + ("left_hand_0", "left_hand_5"), ("left_hand_5", "left_hand_6"), + ("left_hand_6", "left_hand_7"), ("left_hand_7", "left_hand_8"), + ("left_hand_0", "left_hand_9"), ("left_hand_9", "left_hand_10"), + ("left_hand_10", "left_hand_11"), ("left_hand_11", "left_hand_12"), + ("left_hand_0", "left_hand_13"), ("left_hand_13", "left_hand_14"), + ("left_hand_14", "left_hand_15"), ("left_hand_15", "left_hand_16"), + ("left_hand_0", "left_hand_17"), ("left_hand_17", "left_hand_18"), + ("left_hand_18", "left_hand_19"), ("left_hand_19", "left_hand_20"), +] + +RIGHT_HAND_LIMBS_NAMES = [ + ("right_hand_0", "right_hand_1"), ("right_hand_1", "right_hand_2"), + ("right_hand_2", "right_hand_3"), ("right_hand_3", "right_hand_4"), + ("right_hand_0", "right_hand_5"), ("right_hand_5", "right_hand_6"), + ("right_hand_6", "right_hand_7"), ("right_hand_7", "right_hand_8"), + ("right_hand_0", "right_hand_9"), ("right_hand_9", "right_hand_10"), + ("right_hand_10", "right_hand_11"), ("right_hand_11", "right_hand_12"), + ("right_hand_0", "right_hand_13"), ("right_hand_13", "right_hand_14"), + ("right_hand_14", "right_hand_15"), ("right_hand_15", "right_hand_16"), + ("right_hand_0", "right_hand_17"), ("right_hand_17", "right_hand_18"), + ("right_hand_18", "right_hand_19"), ("right_hand_19", "right_hand_20"), +] + +FACE_POINTS = [f"face-{i}" for i in range(68)] +LEFT_HAND_POINTS = [f"left_hand_{i}" for i in range(21)] +RIGHT_HAND_POINTS = [f"right_hand_{i}" for i in range(21)] +GENERAL_HAND_POINTS = [f"hand_{i}" for i in range(21)] + +def get_alphapose_components(): + """ + Creates a list of alphapose components. + + Returns + ------- + list of PoseHeaderComponent + List of holistic components. + """ + + def map_limbs(points, limbs): + index_map = {name: idx for idx, name in enumerate(points)} + return [ + (index_map[a], index_map[b]) + for (a, b) in limbs + ] + + components = [ + PoseHeaderComponent( + name="BODY", + points=BODY_POINTS, + limbs= map_limbs(BODY_POINTS, BODY_LIMBS_NAMES), + colors=[(0,255,0)], + point_format="XYC" + ), + + PoseHeaderComponent( + name="FACE", + points=FACE_POINTS, + limbs=[], # WholeBody face mesh is huge, usually omitted + colors=[(255,255,255)], + point_format="XYC" + ), + + PoseHeaderComponent( + name="LEFT_HAND", + points=GENERAL_HAND_POINTS, + limbs= map_limbs(LEFT_HAND_POINTS, LEFT_HAND_LIMBS_NAMES), + colors=[(0,255,0)], + point_format="XYC" + ), + + PoseHeaderComponent( + name="RIGHT_HAND", + points=GENERAL_HAND_POINTS, + limbs= map_limbs(RIGHT_HAND_POINTS, RIGHT_HAND_LIMBS_NAMES), + colors=[(255,128,0)], + point_format="XYC" + ), + ] + return components + +def load_alphapose_json(json_path): + """ + Load AlphaPose results in either: + + FORMAT A (original): + [ + {"image_id": "0.jpg", "keypoints": [x_0, y_0, c_0, x_1, y_1, c_1, ...], "other keys not used"}, + {"image_id": "1.jpg", "keypoints": [...], ...}, + ... + ] + + FORMAT B (extended): + { + "frames": [... same as above ...], + "metadata": { + "fps": float, + "width": int, + "height": int + } + } + + Returns + ------- + data : list + Sorted list of frame detections. + meta : dict or None + Metadata if present, else None. + """ + with open(json_path, "r") as f: + raw = json.load(f) + + # ----------------------- + # Detect FORMAT B (dict) + # ----------------------- + if isinstance(raw, dict) and "frames" in raw: + frames = raw["frames"] + + # Extract metadata safely + meta = { + "fps": raw.get("metadata", {}).get("fps", None), + "width": raw.get("metadata", {}).get("width", None), + "height": raw.get("metadata", {}).get("height", None), + } + else: + # FORMAT A (list) + frames = raw + meta = None + + # ----------------------- + # Sorting function + # ----------------------- + def extract_frame_number(item): + """ + Extract numeric part from "image_id". + Example: "frame_0012.jpg" → 12 + """ + matches = re.findall(r"\d+", item["image_id"]) + return int(matches[0]) if matches else -1 # fallback if no digits + + # Sort frames numerically + frames = sorted(frames, key=extract_frame_number) + + return frames, meta + +def load_alphapose_wholebody_from_json(input_path: str, + version: float = 0.2, + fps: float = 24, + width=1000, + height=1000, + depth=0) -> Pose: + """ + Loads alphapose_wholebody pose data + + Parameters + ---------- + video_path : string + Path to input video file. + + Returns + ------- + Pose + Loaded pose data with header and body + """ + print("Loading pose with alphapose_wholebody...") + + # Load frames + optional metadata + frames, metadata = load_alphapose_json(input_path) + + # Override fps/width/height ONLY if metadata exists + if metadata is not None: + if metadata.get("fps") is not None: + fps = metadata["fps"] + if metadata.get("width") is not None: + width = metadata["width"] + if metadata.get("height") is not None: + height = metadata["height"] + + frames_xy = [] + frames_conf = [] + + # Parse and reorder all frames + for item in frames: + xy, conf = parse_keypoints_and_confidence(item["keypoints"]) + xy_ord, conf_ord = reorder_133_kpts(xy, conf) + + frames_xy.append(xy_ord) + frames_conf.append(conf_ord) + + # Convert to arrays + xy_data = np.stack(frames_xy, axis=0) # (num_frames, num_keypoints, 2) + conf_data = np.stack(frames_conf, axis=0) # (num_frames, num_keypoints) + + # Add people dimension: + xy_data = xy_data[:, None, :, :] # (num_frames, people, num_keypoints, 2) with people = 1 + conf_data = conf_data[:, None, :] # (num_frames, people, num_keypoints) with people = 1 + + # Build header + header: PoseHeader = PoseHeader(version=version, + dimensions=PoseHeaderDimensions(width=width, height=height, depth=depth), + components=get_alphapose_components()) + + # Build body + body: NumPyPoseBody = NumPyPoseBody(fps=fps, data=xy_data, confidence=conf_data) + + return Pose(header, body) + +def parse_keypoints_and_confidence(flat): + """ + AlphaPose stores keypoints as: + [x0, y0, c0, x1, y1, c1, ...] + Expected total length for 133 keypoints: + 133 * 3 = 399 values + + Returns: + xy: (133, 2) + conf: (133,) + """ + assert len(flat) == 133 * 3, \ + f"ERROR: Expected 133 keypoints (399 values), but got {len(flat)} values. " \ + f"This converter only supports AlphaPose WholeBody-133." + + arr = np.array(flat).reshape(-1, 3) + xy = arr[:, :2] + conf = arr[:, 2] + return xy, conf + + +def reorder_133_kpts(xy, conf): + """ + Reorder XY and confidence to BODY + FACE + L_HAND + R_HAND. + AlphaPose 133 indexing: + - BODY: 0–22 + - FACE: 23–90 + - LH: 91–111 + - RH: 112–132 + """ + body = xy[0:23] + face = xy[23:23+68] + lh = xy[91:91+21] + rh = xy[112:112+21] + + xy_reordered = np.concatenate([body, face, lh, rh], axis=0) + + # Apply same order to confidence + conf_reordered = np.concatenate([ + conf[0:23], + conf[23:23+68], + conf[91:91+21], + conf[112:112+21], + ], axis=0) + + return xy_reordered, conf_reordered + diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index 653d9ae..0b37b2c 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -9,11 +9,13 @@ from pose_format.utils.openpose import OpenPose_Components from pose_format.utils.openpose import BODY_POINTS as OPENPOSE_BODY_POINTS from pose_format.utils.openpose_135 import OpenPose_Components as OpenPose135_Components +from pose_format.utils.alphapose import get_alphapose_components +from pose_format.utils.alphapose import BODY_POINTS as ALPHAPOSE_BODY_POINTS # from pose_format.utils.holistic import holistic_components # The import above creates an error: ImportError: Please install mediapipe with: pip install mediapipe -KnownPoseFormat = Literal["holistic", "openpose", "openpose_135"] +KnownPoseFormat = Literal["holistic", "openpose", "openpose_135", "alphapose"] def get_component_names( @@ -41,6 +43,8 @@ def detect_known_pose_format(pose_or_header: Union[Pose,PoseHeader]) -> KnownPos openpose_135_components = [c.name for c in OpenPose135_Components] + alphapose_components = [c.name for c in get_alphapose_components()] + for component_name in component_names: if component_name in mediapipe_components: return "holistic" @@ -48,6 +52,8 @@ def detect_known_pose_format(pose_or_header: Union[Pose,PoseHeader]) -> KnownPos return "openpose" if component_name in openpose_135_components: return "openpose_135" + if component_name in alphapose_components: + return "alphapose" raise ValueError( f"Could not detect pose format, unknown pose header schema with component names: {component_names}" @@ -87,6 +93,18 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: # if any of the items in point_ points_to_remove_dict = {"pose_keypoints_2d": point_names_to_remove} + elif known_pose_format == "alphapose": + point_names_to_remove = [ + "left_hip", "right_hip", + "left_knee", "right_knee", + "left_ankle", "right_ankle", + "left_big_toe", "left_small_toe", "left_heel", + "right_big_toe", "right_small_toe", "right_heel", + ] + points_to_remove_dict = { + "BODY": point_names_to_remove + } + else: raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {pose_hide_legs.__name__}: {pose.header}" @@ -124,6 +142,9 @@ def pose_shoulders(pose_header: PoseHeader) -> Tuple[Tuple[str, str], Tuple[str, if known_pose_format == "openpose": return ("pose_keypoints_2d", "RShoulder"), ("pose_keypoints_2d", "LShoulder") + if known_pose_format == "alphapose": + return ("BODY", "right_shoulder"), ("BODY", "left_shoulder") + raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {pose_shoulders.__name__}: {pose_header}" ) @@ -142,6 +163,12 @@ def hands_indexes(pose_header: PoseHeader)-> List[int]: pose_header.get_point_index("hand_left_keypoints_2d", "M_CMC"), pose_header.get_point_index("hand_right_keypoints_2d", "M_CMC"), ] + + if known_pose_format == "sapiens": + return [ + pose_header.get_point_index("LEFT_HAND", "hand_9"), + pose_header.get_point_index("RIGHT_HAND", "hand_9"), + ] raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {hands_indexes.__name__}: {pose_header}" ) @@ -163,7 +190,9 @@ def hands_components(pose_header: PoseHeader)-> Tuple[Tuple[str, str], Tuple[str if known_pose_format == "openpose": return ("hand_left_keypoints_2d", "hand_right_keypoints_2d"), ("BASE", "P_CMC", "I_CMC"), ("BASE", "M_CMC") - + + if known_pose_format == "alphapose": + return ("LEFT_HAND", "RIGHT_HAND"), ("hand_0", "hand_17", "hand_5"), ("hand_0", "hand_9") raise NotImplementedError( f"Unsupported pose header schema '{known_pose_format}' for {hands_components.__name__}: {pose_header}" ) @@ -209,6 +238,8 @@ def get_standard_components_for_known_format(known_pose_format: KnownPoseFormat) return OpenPose_Components if known_pose_format == "openpose_135": return OpenPose135_Components + if known_pose_format == "alphapose": + return get_alphapose_components() raise NotImplementedError(f"Unsupported pose header schema {known_pose_format}") @@ -241,6 +272,8 @@ def get_hand_wrist_index(pose: Pose, hand: str)-> int: return pose.header.get_point_index(f"{hand.upper()}_HAND_LANDMARKS", "WRIST") if known_pose_format == "openpose": return pose.header.get_point_index(f"hand_{hand.lower()}_keypoints_2d", "BASE") + if known_pose_format == "alphapose": + return pose.header.get_point_index(f"{hand.upper()}_HAND", f"hand_0") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_hand_wrist_index.__name__}: {pose.header}" ) @@ -252,6 +285,8 @@ def get_body_hand_wrist_index(pose: Pose, hand: str)-> int: return pose.header.get_point_index("POSE_LANDMARKS", f"{hand.upper()}_WRIST") if known_pose_format == "openpose": return pose.header.get_point_index("pose_keypoints_2d", f"{hand.upper()[0]}Wrist") + if known_pose_format == "alphapose": + return pose.header.get_point_index("BODY", f"{hand.lower()}_wrist") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_body_hand_wrist_index.__name__}: {pose.header}" ) diff --git a/src/python/pose_format/utils/generic_test.py b/src/python/pose_format/utils/generic_test.py index e1b5912..4345542 100644 --- a/src/python/pose_format/utils/generic_test.py +++ b/src/python/pose_format/utils/generic_test.py @@ -224,6 +224,22 @@ def test_pose_remove_legs(fake_poses: List[Pose]): component_index = c_names.index("pose_keypoints_2d") pose_with_legs_removed = pose_hide_legs(pose, remove=True) + for point_name in points_that_should_be_removed: + assert point_name not in pose_with_legs_removed.header.components[component_index].points, f"{pose_with_legs_removed.header.components[component_index].name},{pose_with_legs_removed.header.components[component_index].points}" + assert point_name in pose.header.components[component_index].points + + elif known_pose_format == "alphapose": + c_names = [c.name for c in pose.header.components] + points_that_should_be_removed = [ + "left_hip", "right_hip", + "left_knee", "right_knee", + "left_ankle", "right_ankle", + "left_big_toe", "left_small_toe", "left_heel", + "right_big_toe", "right_small_toe", "right_heel", + ] + component_index = c_names.index("BODY") + pose_with_legs_removed = pose_hide_legs(pose, remove=True) + for point_name in points_that_should_be_removed: assert point_name not in pose_with_legs_removed.header.components[component_index].points, f"{pose_with_legs_removed.header.components[component_index].name},{pose_with_legs_removed.header.components[component_index].points}" assert point_name in pose.header.components[component_index].points @@ -272,6 +288,9 @@ def test_fake_pose(known_pose_format: KnownPoseFormat): assert point_formats[0] == "XYC" elif detected_format == 'openpose_135': assert point_formats[0] == "XYC" + elif detected_format == 'alphapose': + assert point_formats[0] == "XYC" + assert detected_format == known_pose_format assert pose.body.fps == fps diff --git a/src/python/pyproject.toml b/src/python/pyproject.toml index 2855dd7..296f548 100644 --- a/src/python/pyproject.toml +++ b/src/python/pyproject.toml @@ -73,5 +73,6 @@ disable = [ [project.scripts] pose_info = "pose_format.bin.pose_info:main" video_to_pose = "pose_format.bin.pose_estimation:main" +json_to_pose = "pose_format.bin.json_to_pose:main" videos_to_poses = "pose_format.bin.directory:main" visualize_pose = "pose_format.bin.pose_visualizer:main"