Initial Serializer class refactor, very WIP, partial serialization working

This commit is contained in:
Jonas Holzman 2025-05-19 12:45:00 +02:00
parent 90aa72a767
commit 2f75f7492d
2 changed files with 340 additions and 66 deletions

389
dumper.py
View File

@ -1,52 +1,329 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from copy import copy
from dataclasses import dataclass
from os.path import abspath
from typing import Any
import bpy
from . import utils
def get_dumper(bl_object: bpy.types.bpy_struct) -> type[Dumper]:
"""Get the closest corresponding dumper for a given Blender object using its MRO"""
for cls in bl_object.__class__.mro():
dumper_map = DumperRegistry().dumper_map
if cls in dumper_map:
return dumper_map[cls]
# Fallback to base Dumper if no matches are found
return Dumper
from . utils import BlenderProperty
def dump_nodes(nodes: list[bpy.types.Node]):
"""Generic recursive dump, convert nodes into a dict"""
Dumper.pointers.clear() # TODO: Bad global
data = [dump_node(node) for node in nodes]
Dumper.pointers.clear()
dump_pointers = {}
data = [Serializer.serialize(node, dump_pointers) for node in nodes]
return data
def dump_node(node: bpy.types.Node):
dumper = get_dumper(node)
return dumper.dump(node)
def load_nodes(data, node_tree):
"""Load/Dump nodes into a specific node tree"""
Dumper.pointers.clear()
dump_pointers = {}
Serializer.deserialize(data, node_tree, dump_pointers)
dumper = get_dumper(node_tree)
dumper.load(data, node_tree)
Dumper.pointers.clear()
# TODO: Sub serialize function where the isinstance is set to the default number of things.
# TODO: Collection is not handled as a class anymore, handle it manually
class Serializer(ABC):
"""
Base Serializer class, from which other Serializer are derived.
`bl_pointers_ref` corresponds to a mutable dict passed through serialize/deserialize
functions, containing Blender object pointers from the current operation from relinking.
"""
# Whitelisted properties, applied after the blacklist
prop_whitelist = None
# Properties that are excluded from (de)serialization, in addition to any bl_* properties
prop_blacklist = ("rna_type", "id_data", "depsgraph")
serializer_map = {}
# --- Getters for sub-serializers ---
@classmethod
def get_serializer_map(cls) -> dict[type[bpy.types.bpy_struct], type[Serializer]]:
"""Store the Serializer Map in a class variable for simple caching"""
if not cls.serializer_map:
for subclass in utils.all_subclasses(Dumper):
assert hasattr(subclass, "bl_type")
cls.serializer_map[subclass.bl_type] = subclass
return cls.serializer_map
@classmethod
def get_serializer(cls, bl_object: bpy.types.bpy_struct) -> type[Serializer]:
"""Get the closest corresponding dumper for a given Blender object using its MRO"""
serializer_map = cls.get_serializer_map()
bl_type = type(bl_object.bl_rna.type_recast())
for bl_parents in bl_type.mro():
if bl_parents in serializer_map:
return serializer_map[bl_parents]
# Fallback to base Serializer if no matches are found
return Serializer
# --- Properties to (de)serialize ---
@classmethod
def get_serialized_properties(cls, obj: bpy.types.bpy_struct | Any):
serialized_properties: list[BlenderProperty] = [
BlenderProperty(rep=prop, attr=getattr(obj, prop.identifier))
for prop in obj.bl_rna.properties
if not prop.identifier.startswith("bl_") # Exclude internal Blender properties
and prop.identifier not in cls.prop_blacklist # Additional blacklist filtering
]
if cls.prop_whitelist: # Additional whitelist, applied after the blacklist
serialized_properties: list[BlenderProperty] = [
prop for prop in serialized_properties
if prop.rep.identifier in cls.prop_whitelist
]
return serialized_properties
# --- Serialization ---
@classmethod
@abstractmethod
def serialize(cls, obj: bpy.types.bpy_struct | Any, bl_pointers_ref: dict) -> dict:
"""Base Serialize method, overridded by subclasses"""
# Early recursive return case
# TODO: Ported as is, check the heuristics, (if there are more or attributes type to add)
if isinstance(obj, (str, int, float, dict, list, type(None))):
return obj
# Returned data, tracks the pointer for later re-assignments during deserialization
data = {"_kit_ptr": obj.as_pointer()}
bl_pointers_ref[obj.as_pointer()] = obj
# Iterate over all *filtered* properties found in the object
for bl_prop in cls.get_serialized_properties(obj):
# Do not store default values nor read-only properties
if (array := getattr(bl_prop.rep, "default_array", None)) and bl_prop.attr == array:
continue
if isinstance(bl_prop.attr, (str, int, float)) and bl_prop.attr == bl_prop.rep.default:
continue
if obj.is_property_readonly(bl_prop.rep.identifier):
continue
print(type(bl_prop.attr))
# Serialize each property
data[bl_prop.rep.identifier] = cls.serialize_property(bl_prop, bl_pointers_ref)
return data
@classmethod
def serialize_property(cls, bl_prop: BlenderProperty, bl_pointers_ref: dict) -> Any:
"""Serialize node property, special cases for arrays/collections/pointers"""
# Property array case
if getattr(bl_prop.rep, "is_array", False):
# Contained in BoolProperty, IntProperty and FloatProperty
prop_array = []
for item in bl_prop.attr:
assert isinstance(item, (bool, int, float)) # TODO: For development, replace by list comprehension later
prop_array.append(item)
return prop_array
# Collection case
if isinstance(bl_prop.attr, bpy.types.bpy_prop_collection):
collection = bl_prop.attr
if not collection:
return []
values = [cls.sub_serialize(sub_prop) for sub_prop in collection]
# TODO: Check why the original code has a None check
return [v for v in values if v is not None]
# Pointer case
if bl_prop.rep.type == "POINTER" and bl_prop.attr:
# Property points to another object, stores it ptr/deref value in our pointer table
ptr = bl_prop.attr.as_pointer()
if ptr in bl_pointers_ref:
return ptr
bl_pointers_ref[ptr] = bl_prop.attr
return cls.sub_serialize(bl_prop.attr, bl_pointers_ref)
@classmethod
def sub_serialize(cls, obj: bpy.types.bpy_struct | Any, bl_pointers_ref: dict) -> Any:
if not isinstance(obj, bpy.types.bpy_struct):
# Primitive type, return directly
return obj
"""Resolve which Serializer class to use"""
serializer = cls.get_serializer(obj)
return serializer.serialize(obj, bl_pointers_ref)
@classmethod
@abstractmethod
def construct_bl_object(cls, data: dict):
"""Abstract method for Serializer specificaiton to provide a way to construct their object"""
print("DEBUG: construct_bl_object called on Base Serializer, shouldn't happen")
return None
# --- Deserialization ---
@classmethod
@abstractmethod
def deserialize(cls, data: dict, target_obj: bpy.types.bpy_struct, bl_pointers_ref: dict):
if target_obj is None:
target_obj = cls.construct_bl_object(data)
if target_obj is None:
print("DEBUG: No method to construct object")
# Failed to construct object/no speciailization to construct object
return None
if (kit_ptr := data.get("_kit_ptr", None)):
bl_pointers_ref[kit_ptr] = target_obj
data_to_deserialize = cls.get_data_to_deserialize(data, target_obj)
for stored_key, stored_value in data_to_deserialize:
if stored_key.startswith("_kit") or stored_key not in target_obj.bl_rna.properties:
continue
target_bl_prop = BlenderProperty(rep=target_obj.bl_rna.properties[stored_key],
attr=getattr(target_obj, stored_key))
# Skip the property if it's read-only
if target_bl_prop.rep.is_readonly:
continue
# Unlike serialization, there's no property array case, as they are just directly assigned
# Collection case
if isinstance(target_bl_prop.attr, bpy.types.bpy_prop_collection):
cls.deserialize_collection(stored_value, target_bl_prop.attr, bl_pointers_ref)
continue
value_to_set = stored_value
# Pointer case
if target_bl_prop.rep.type == "POINTER":
value_to_set = cls.deserialize_pointer(stored_value, target_bl_prop.attr, bl_pointers_ref)
# Assign the property
setattr(target_obj, stored_key, value_to_set)
# If supported, update the Blender property after setting it
if hasattr(target_bl_prop.attr, "update"):
target_bl_prop.attr.update()
@classmethod
def deserialize_collection(cls, stored_value: Any, bl_coll: bpy.types.bpy_prop_collection, bl_pointers_ref: dict):
# Static collection case
if not hasattr(bl_coll, "new"):
cls.sub_deserialize(stored_value, bl_coll, bl_pointers_ref)
return
# TODO: Code pasted as is, review later
new_func = bl_coll.bl_rna.functions["new"]
for i, value in enumerate(stored_value):
if value.get("_new"):
params = value["_new"]
else:
params = {
k: value.get(k, utils.get_bl_default(v))
for k, v in new_func.parameters.items()[:-1]
}
# Replace arg pointer with bl object
valid_pointers = True
for param in bl_coll.bl_rna.functions["new"].parameters:
if param.identifier not in params or param.type != "POINTER":
continue
# TODO: It might be possible to abstract this into deserialize_pointer somehow
pointer_id = params[param.identifier]
if bl_object := bl_pointers_ref.get(pointer_id):
params[param.identifier] = bl_object
else:
print(f"No Pointer found for param {param.identifier} of {bl_coll}")
valid_pointers = False
if not valid_pointers:
continue
# TODO: Ugly bad :(
try:
item = bl_coll.new(**params)
except RuntimeError as e:
try:
item = bl_coll[i]
except IndexError as e:
break
@classmethod
def deserialize_pointer(cls, stored_value: Any, target_bl_prop_attr: bpy.types.bpy_struct, bl_pointers_ref: dict):
if stored_value is None:
return None
# Actual pointer
if isinstance(stored_value, int):
if stored_value not in bl_pointers_ref:
print("DEBUG: Pointer reference hasn't been loaded yet")
# Obtain a reference to a previously dereferenced object
return bl_pointers_ref[stored_value]
# Create the object by passing it to the sub-serializer
cls.sub_deserialize(stored_value, target_bl_prop_attr, bl_pointers_ref)
bl_pointers_ref[stored_value["_kit_ptr"]] = target_bl_prop_attr
return target_bl_prop_attr
@classmethod
def sub_deserialize(cls, data: dict, target_bl_obj: bpy.types.bpy_struct, bl_pointers_ref: dict):
# Get the deserializer for the corresponding target_bl_prop
# Introspects on its type, recreateas the object even if its None
deserializer = cls.get_serializer(target_bl_obj)
return deserializer.deserialize(data, bl_pointers_ref, target_bl_obj)
@classmethod
def get_data_to_deserialize(cls, data: dict, target_obj: bpy.types.bpy_struct=None):
props_to_deserialize = cls.get_serialized_properties(target_obj)
sorted_data = sorted(
data.items(), key=lambda x: props_to_deserialize.index(x[0]) if x[0] in props_to_deserialize else 0
)
return sorted_data
# class NodeSocket(Serializer):
# bl_type = bpy.types.NodeSocket
# prop_blacklist = Serializer.prop_blacklist + (
# "node",
# "links",
# "display_shape",
# "link_limit",
# )
# @classmethod
# def serialize(cls, socket_obj: bpy.types.NodeSocket, _: dict) -> dict:
# if socket_obj.is_unavailable:
# return None
# return super().serialize(socket_obj)
class Dumper:
pointers = {}
includes = []
excludes = ["rna_type", "bl_rna", "id_data", "depsgraph"]
@ -67,7 +344,7 @@ class Dumper:
print(f"New not implemented for data {data}")
@classmethod
def load(cls, data, bl_object=None):
def load(cls, data, bl_pointers_ref, bl_object=None):
if bl_object is None:
bl_object = cls.new(data)
@ -75,7 +352,7 @@ class Dumper:
return
if bl_pointer := data.get("bl_pointer"):
cls.pointers[bl_pointer] = bl_object
bl_pointers_ref[bl_pointer] = bl_object
props = cls.properties(bl_object)
for key, value in sorted(
@ -98,9 +375,9 @@ class Dumper:
elif prop.type == "POINTER":
if isinstance(value, int): # It's a pointer
if value not in cls.pointers:
if value not in bl_pointers_ref:
print(bl_object, "not loaded yet", prop)
value = cls.pointers[value]
value = bl_pointers_ref[value]
elif value is None:
utils.set_bl_attribute(bl_object, key, value)
@ -114,7 +391,7 @@ class Dumper:
attr = dumper.new(value)
dumper.load(value, attr)
cls.pointers[value["bl_pointer"]] = attr
bl_pointers_ref[value["bl_pointer"]] = attr
if hasattr(attr, "update"):
attr.update()
@ -135,12 +412,12 @@ class Dumper:
# return bl_object
@classmethod
def dump(cls, bl_object):
def dump(cls, bl_object, bl_pointers_ref):
if isinstance(bl_object, (str, int, float, dict, list, type(None))):
return bl_object
data = {"bl_pointer": bl_object.as_pointer()}
cls.pointers[bl_object.as_pointer()] = bl_object
bl_pointers_ref[bl_object.as_pointer()] = bl_object
for prop in cls.properties(bl_object):
if not hasattr(bl_object, prop.identifier):
@ -163,10 +440,10 @@ class Dumper:
value = PropCollection.dump(value)
elif prop.type == "POINTER" and value:
if value.as_pointer() in cls.pointers:
if value.as_pointer() in bl_pointers_ref:
value = value.as_pointer()
else:
cls.pointers[value.as_pointer()] = value
bl_pointers_ref[value.as_pointer()] = value
dumper = get_dumper(value)
value = dumper.dump(value)
@ -204,7 +481,7 @@ class PropCollection(Dumper):
dumper = None
if not hasattr(coll, "new"): # Static collection
for item, value in zip(coll, values):
for item, value in zip(coll, values): # TODO: That zip doesn't make sense, or does it?
dumper = dumper or get_dumper(item)
dumper.load(value, item)
@ -227,7 +504,7 @@ class PropCollection(Dumper):
continue
pointer_id = params[param.identifier]
if bl_object := cls.pointers.get(pointer_id):
if bl_object := bl_pointers_ref.get(pointer_id):
params[param.identifier] = bl_object
else:
print(f"No Pointer found for param {param.identifier} of {coll}")
@ -277,7 +554,7 @@ class NodeSocket(Dumper):
if socket.is_unavailable:
return None
# cls.pointers[socket.as_pointer()] = socket
# bl_pointers_ref[socket.as_pointer()] = socket
data = super().dump(socket)
@ -316,7 +593,7 @@ class NodeTreeInterfaceSocket(Dumper):
@classmethod
def dump(cls, socket):
# cls.pointers[socket.as_pointer()] = socket
# bl_pointers_ref[socket.as_pointer()] = socket
data = super().dump(socket)
# data["_id"] = socket.as_pointer()
@ -340,7 +617,7 @@ class NodeSockets(PropCollection):
node_sockets = [s for s in coll if not s.is_unavailable]
for socket, value in zip(node_sockets, values):
cls.pointers[value["bl_pointer"]] = socket
bl_pointers_ref[value["bl_pointer"]] = socket
Dumper.load(value, socket)
# for k, v in value.items():
# if k not in socket.bl_rna.properties:
@ -353,7 +630,7 @@ class NodeSockets(PropCollection):
if len(node_sockets) == len(inputs): # Match by index
super().load({"inputs": inputs}, node)
for socket, value in zip(node_sockets, coll):
cls.pointers[value['_id']] = socket
bl_pointers_ref[value['_id']] = socket
else: # Match by name
print(f'Match Inputs by Name for node {node}')
for socket in node_sockets:
@ -363,7 +640,7 @@ class NodeSockets(PropCollection):
value = inputs[index]
print(socket, value)
cls.pointers[value['_id']] = socket
bl_pointers_ref[value['_id']] = socket
Dumper.load(value, socket)
del inputs[index]
@ -389,7 +666,7 @@ class Node(Dumper):
@classmethod
def dump(cls, node=None):
# cls.pointers[node.as_pointer()] = node
# bl_pointers_ref[node.as_pointer()] = node
data = super().dump(node)
# data["_id"] = node.as_pointer()
@ -409,7 +686,7 @@ class Node(Dumper):
def load(cls, data, node):
if node is None:
return
# cls.pointers[data['bl_pointer']] = node
# bl_pointers_ref[data['bl_pointer']] = node
inputs = copy(data.pop("inputs", []))
outputs = copy(data.pop("outputs", []))
@ -465,8 +742,8 @@ class Nodes(PropCollection):
# Pair zone input and output
for node_data in values:
if paired_output_id := node_data.get("_pair_with_output", None):
node = cls.pointers[node_data["bl_pointer"]]
node.pair_with_output(cls.pointers[paired_output_id])
node = bl_pointers_ref[node_data["bl_pointer"]]
node.pair_with_output(bl_pointers_ref[paired_output_id])
Dumper.load(
{"inputs": node_data["inputs"], "outputs": node_data["outputs"]},
@ -701,7 +978,7 @@ class CompositorNodeRLayers(Node):
view_layer_data[prop.identifier]
"""
# cls.pointers[bl_object.as_pointer()] = bl_object
# bl_pointers_ref[bl_object.as_pointer()] = bl_object
data["scene"] = {
"bl_pointer": node.scene.as_pointer(),
@ -743,21 +1020,3 @@ class ViewLayers(PropCollection):
view_layer = coll.new(value["name"])
Dumper.load(value, view_layer)
class DumperRegistry:
"""Singleton-like class that holds a map of all parsers, constructed on first instantiation"""
dumper_map = None
def __init__(self):
if self.dumper_map is None:
self.construct_dumper_map()
@classmethod
def construct_dumper_map(cls):
cls.dumper_map = {}
for subclass in utils.all_subclasses(Dumper):
assert hasattr(subclass, "bl_type")
cls.dumper_map[subclass.bl_type] = subclass
print(cls.dumper_map)

View File

@ -1,6 +1,21 @@
from dataclasses import dataclass
from typing import Any
import bpy
@dataclass
class BlenderProperty:
"""
Blender Property abstraction, used since a Blender property value isn't
directly accessible from its Property object representation
NOTE: Do not rely on value being up-to-date, data will get stall
"""
rep: bpy.types.Property
attr: Any
def all_subclasses(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)]