Static type checking with mypy

This does introduce some not-so-nice things, like having to annotate each
`__init__` function with `-> None`. However, the benefits of having static
type checking in a complex bit of software like BAT outweigh the downsides.
This commit is contained in:
Sybren A. Stüvel 2018-03-09 10:54:06 +01:00
parent 632d01334c
commit fdbbc3a20d
18 changed files with 117 additions and 82 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
__pycache__
/*.egg-info/
/.cache
/.mypy_cache/
/.pytest_cache
.coverage
/dist/

View File

@ -23,3 +23,11 @@ There are two object types used to represent file paths. Those are strictly sepa
When it is necessary to interpret a `bpathlib.BlendPath` as a real path instead of a sequence of
bytes, BAT first attempts to decode it as UTF-8. If that fails, the local filesystem encoding is
used. The latter is also no guarantee of correctness, though.
## Type checking
The code statically type-checked with [mypy](http://mypy-lang.org/).
Mypy likes to see the return type of `__init__` methods explicitly declared as `None`. Until issue
[#604](https://github.com/python/mypy/issues/604) is resolved, we just do this in our code too.

View File

@ -22,14 +22,13 @@
import atexit
import collections
import functools
import gzip
import logging
import os
import struct
import pathlib
import tempfile
import functools
import typing
from . import exceptions, dna_io, dna, header
@ -40,8 +39,9 @@ log = logging.getLogger(__name__)
FILE_BUFFER_SIZE = 1024 * 1024
BLENDFILE_MAGIC = b'BLENDER'
GZIP_MAGIC = b'\x1f\x8b'
BFBList = typing.List['BlendFileBlock']
_cached_bfiles = {}
_cached_bfiles = {} # type: typing.Dict[pathlib.Path, BlendFile]
def open_cached(path: pathlib.Path, mode='rb', assert_cached=False) -> 'BlendFile':
@ -94,7 +94,7 @@ class BlendFile:
"""
log = log.getChild('BlendFile')
def __init__(self, path: pathlib.Path, mode='rb'):
def __init__(self, path: pathlib.Path, mode='rb') -> None:
"""Create a BlendFile instance for the blend file at the path.
Opens the file for reading or writing pending on the access. Compressed
@ -106,22 +106,21 @@ class BlendFile:
self.filepath = path
self.raw_filepath = path
self._is_modified = False
self.fileobj = None # type: typing.IO[bytes]
self._open_file(path, mode)
self.blocks = [] # BlendFileBlocks, in disk order.
self.code_index = collections.defaultdict(list)
self.structs = []
self.sdna_index_from_id = {}
self.block_from_addr = {}
self.blocks = [] # type: BFBList
"""BlendFileBlocks of this file, in disk order."""
self.code_index = collections.defaultdict(list) # type: typing.Dict[bytes, BFBList]
self.structs = [] # type: typing.List[dna.Struct]
self.sdna_index_from_id = {} # type: typing.Dict[bytes, int]
self.block_from_addr = {} # type: typing.Dict[int, BlendFileBlock]
try:
self.header = header.BlendFileHeader(self.fileobj, self.raw_filepath)
self.block_header_struct = self.header.create_block_header_struct()
self._load_blocks()
except Exception:
self.fileobj.close()
raise
def _open_file(self, path: pathlib.Path, mode: str):
"""Open a blend file, decompressing if necessary.
@ -134,7 +133,10 @@ class BlendFile:
correct magic bytes.
"""
fileobj = path.open(mode, buffering=FILE_BUFFER_SIZE)
if 'b' not in mode:
raise ValueError('Only binary modes are supported, not %r' % mode)
fileobj = path.open(mode, buffering=FILE_BUFFER_SIZE) # typing.IO[bytes]
fileobj.seek(0, os.SEEK_SET)
magic = fileobj.read(len(BLENDFILE_MAGIC))
@ -171,13 +173,16 @@ class BlendFile:
def _load_blocks(self):
"""Read the blend file to load its DNA structure to memory."""
self.structs.clear()
self.sdna_index_from_id.clear()
while True:
block = BlendFileBlock(self)
if block.code == b'ENDB':
break
if block.code == b'DNA1':
self.structs, self.sdna_index_from_id = self.decode_structs(block)
self.decode_structs(block)
else:
self.fileobj.seek(block.size, os.SEEK_CUR)
@ -270,6 +275,10 @@ class BlendFile:
DNACatalog is a catalog of all information in the DNA1 file-block
"""
self.log.debug("building DNA catalog")
# Get some names in the local scope for faster access.
structs = self.structs
sdna_index_from_id = self.sdna_index_from_id
endian = self.header.endian
shortstruct = endian.USHORT
shortstruct2 = endian.USHORT2
@ -282,8 +291,6 @@ class BlendFile:
data = self.fileobj.read(block.size)
types = []
typenames = []
structs = []
sdna_index_from_id = {}
offset = 8
names_len = intstruct.unpack_from(data, offset)[0]
@ -346,8 +353,6 @@ class BlendFile:
dna_struct.append_field(field)
dna_offset += dna_size
return structs, sdna_index_from_id
def abspath(self, relpath: bpathlib.BlendPath) -> bpathlib.BlendPath:
"""Construct an absolute path from a blendfile-relative path."""
@ -391,7 +396,7 @@ class BlendFileBlock:
old_structure = struct.Struct(b'4sI')
"""old blend files ENDB block structure"""
def __init__(self, bfile: BlendFile):
def __init__(self, bfile: BlendFile) -> None:
self.bfile = bfile
# Defaults; actual values are set by interpreting the block header.
@ -406,7 +411,7 @@ class BlendFileBlock:
Points to the data after the block header.
"""
self.endian = bfile.header.endian
self._id_name = ... # see the id_name property
self._id_name = ... # type: typing.Union[None, ellipsis, bytes]
header_struct = bfile.block_header_struct
data = bfile.fileobj.read(header_struct.size)
@ -448,7 +453,7 @@ class BlendFileBlock:
def __hash__(self) -> int:
return hash((self.code, self.addr_old, self.bfile.filepath))
def __eq__(self, other: 'BlendFileBlock') -> bool:
def __eq__(self, other: object) -> bool:
if not isinstance(other, BlendFileBlock):
return False
return (self.code == other.code and
@ -487,7 +492,10 @@ class BlendFileBlock:
self._id_name = self[b'id', b'name']
except KeyError:
self._id_name = None
return self._id_name
# TODO(Sybren): figure out how to let mypy know self._id_name cannot
# be ellipsis at this point.
return self._id_name # type: ignore
def refine_type_from_index(self, sdna_index: int):
"""Change the DNA Struct associated with this block.
@ -514,7 +522,7 @@ class BlendFileBlock:
sdna_index = self.bfile.sdna_index_from_id[dna_type_id]
self.refine_type_from_index(sdna_index)
def abs_offset(self, path: dna.FieldPath) -> (int, int):
def abs_offset(self, path: dna.FieldPath) -> typing.Tuple[int, int]:
"""Compute the absolute file offset of the field.
:returns: tuple (offset in bytes, length of array in items)
@ -563,18 +571,19 @@ class BlendFileBlock:
default=...,
null_terminated=True,
as_str=True,
) -> typing.Iterator[typing.Tuple[bytes, typing.Any]]:
) -> typing.Iterator[typing.Tuple[dna.FieldPath, typing.Any]]:
"""Generator, yields (path, property value) tuples.
If a property cannot be decoded, a string representing its DNA type
name is used as its value instead, between pointy brackets.
"""
path_full = path # type: dna.FieldPath
if path_root:
path_full = (
(path_root if type(path_root) is tuple else (path_root,)) +
(path if type(path) is tuple else (path,)))
else:
path_full = path
if isinstance(path_root, bytes):
path_root = (path_root,)
if isinstance(path, bytes):
path = (path,)
path_full = tuple(path_root) + tuple(path)
try:
# Try accessing as simple property
@ -615,7 +624,7 @@ class BlendFileBlock:
hsh = zlib.adler32(str(value).encode(), hsh)
return hsh
def set(self, path: dna.FieldPath, value):
def set(self, path: bytes, value):
dna_struct = self.bfile.structs[self.sdna_index]
self.bfile.mark_modified()
self.bfile.fileobj.seek(self.file_offset, os.SEEK_SET)

View File

@ -13,7 +13,7 @@ log = logging.getLogger(__name__)
class Name:
"""dna.Name is a C-type name stored in the DNA as bytes."""
def __init__(self, name_full: bytes):
def __init__(self, name_full: bytes) -> None:
self.name_full = name_full
self.name_only = self.calc_name_only()
self.is_pointer = self.calc_is_pointer()
@ -72,7 +72,7 @@ class Field:
dna_type: 'Struct',
name: Name,
size: int,
offset: int):
offset: int) -> None:
self.dna_type = dna_type
self.name = name
self.size = size
@ -87,7 +87,7 @@ class Struct:
log = log.getChild('Struct')
def __init__(self, dna_type_id: bytes, size: int = None):
def __init__(self, dna_type_id: bytes, size: int = None) -> None:
"""
:param dna_type_id: name of the struct in C, like b'AlembicObjectPath'.
:param size: only for unit tests; typically set after construction by
@ -96,8 +96,8 @@ class Struct:
"""
self.dna_type_id = dna_type_id
self._size = size
self._fields = []
self._fields_by_name = {}
self._fields = [] # type: typing.List[Field]
self._fields_by_name = {} # type: typing.Dict[bytes, Field]
def __repr__(self):
return '%s(%r)' % (type(self).__qualname__, self.dna_type_id)
@ -183,7 +183,7 @@ class Struct:
def field_get(self,
file_header: header.BlendFileHeader,
fileobj: typing.BinaryIO,
fileobj: typing.IO[bytes],
path: FieldPath,
default=...,
null_terminated=True,
@ -263,7 +263,7 @@ class Struct:
def field_set(self,
file_header: header.BlendFileHeader,
fileobj: typing.BinaryIO,
fileobj: typing.IO[bytes],
path: bytes,
value: typing.Any):
"""Write a value to the blend file.

View File

@ -16,7 +16,7 @@ class EndianIO:
ULONG = struct.Struct(b'<Q')
@classmethod
def _read(cls, fileobj: typing.BinaryIO, typestruct: struct.Struct):
def _read(cls, fileobj: typing.IO[bytes], typestruct: struct.Struct):
data = fileobj.read(typestruct.size)
try:
return typestruct.unpack(data)[0]
@ -24,35 +24,35 @@ class EndianIO:
raise struct.error('%s (read %d bytes)' % (ex, len(data))) from None
@classmethod
def read_char(cls, fileobj: typing.BinaryIO):
def read_char(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.UCHAR)
@classmethod
def read_ushort(cls, fileobj: typing.BinaryIO):
def read_ushort(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.USHORT)
@classmethod
def read_short(cls, fileobj: typing.BinaryIO):
def read_short(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.SSHORT)
@classmethod
def read_uint(cls, fileobj: typing.BinaryIO):
def read_uint(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.UINT)
@classmethod
def read_int(cls, fileobj: typing.BinaryIO):
def read_int(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.SINT)
@classmethod
def read_float(cls, fileobj: typing.BinaryIO):
def read_float(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.FLOAT)
@classmethod
def read_ulong(cls, fileobj: typing.BinaryIO):
def read_ulong(cls, fileobj: typing.IO[bytes]):
return cls._read(fileobj, cls.ULONG)
@classmethod
def read_pointer(cls, fileobj: typing.BinaryIO, pointer_size: int):
def read_pointer(cls, fileobj: typing.IO[bytes], pointer_size: int):
"""Read a pointer from a file."""
if pointer_size == 4:
@ -62,7 +62,7 @@ class EndianIO:
raise ValueError('unsupported pointer size %d' % pointer_size)
@classmethod
def write_string(cls, fileobj: typing.BinaryIO, astring: str, fieldlen: int) -> int:
def write_string(cls, fileobj: typing.IO[bytes], astring: str, fieldlen: int) -> int:
"""Write a (truncated) string as UTF-8.
The string will always be written 0-terminated.
@ -94,7 +94,7 @@ class EndianIO:
return fileobj.write(encoded + b'\0')
@classmethod
def write_bytes(cls, fileobj: typing.BinaryIO, data: bytes, fieldlen: int) -> int:
def write_bytes(cls, fileobj: typing.IO[bytes], data: bytes, fieldlen: int) -> int:
"""Write (truncated) bytes.
When len(data) < fieldlen, a terminating b'\0' will be appended.

View File

@ -27,7 +27,7 @@ import pathlib
class BlendFileError(Exception):
"""Raised when there was an error reading/parsing a blend file."""
def __init__(self, message: str, filepath: pathlib.Path):
def __init__(self, message: str, filepath: pathlib.Path) -> None:
super().__init__(message)
self.filepath = filepath
@ -48,7 +48,7 @@ class NoReaderImplemented(NotImplementedError):
:type dna_type: blender_asset_tracer.blendfile.dna.Struct
"""
def __init__(self, message: str, dna_name, dna_type):
def __init__(self, message: str, dna_name, dna_type) -> None:
super().__init__(message)
self.dna_name = dna_name
self.dna_type = dna_type
@ -61,7 +61,7 @@ class NoWriterImplemented(NotImplementedError):
:type dna_type: blender_asset_tracer.blendfile.dna.Struct
"""
def __init__(self, message: str, dna_name, dna_type):
def __init__(self, message: str, dna_name, dna_type) -> None:
super().__init__(message)
self.dna_name = dna_name
self.dna_type = dna_type
@ -70,7 +70,7 @@ class NoWriterImplemented(NotImplementedError):
class SegmentationFault(Exception):
"""Raised when a pointer to a non-existant datablock was dereferenced."""
def __init__(self, message: str, address: int, field_path=None):
def __init__(self, message: str, address: int, field_path=None) -> None:
super().__init__(message)
self.address = address
self.field_path = field_path

View File

@ -18,7 +18,7 @@ class BlendFileHeader:
"""
structure = struct.Struct(b'7s1s1s3s')
def __init__(self, fileobj: typing.BinaryIO, path: pathlib.Path):
def __init__(self, fileobj: typing.IO[bytes], path: pathlib.Path) -> None:
log.debug("reading blend-file-header %s", path)
fileobj.seek(0, os.SEEK_SET)
header = fileobj.read(self.structure.size)

View File

@ -26,7 +26,7 @@ def sequencer_strips(sequence_editor: BlendFileBlock) \
See blender_asset_tracer.cdefs.SEQ_TYPE_xxx for the type numbers.
"""
def iter_seqbase(seqbase) -> typing.Iterator[BlendFileBlock]:
def iter_seqbase(seqbase) -> typing.Iterator[typing.Tuple[BlendFileBlock, int]]:
for seq in listbase(seqbase):
seq.refine_type(b'Sequence')
seq_type = seq[b'type']

View File

@ -2,6 +2,7 @@
import logging
import pathlib
import sys
import typing
from blender_asset_tracer import pack
@ -39,7 +40,7 @@ def cli_pack(args):
raise SystemExit(1)
def paths_from_cli(args) -> (pathlib.Path, pathlib.Path, pathlib.Path):
def paths_from_cli(args) -> typing.Tuple[pathlib.Path, pathlib.Path, pathlib.Path]:
"""Return paths to blendfile, project, and pack target.
Calls sys.exit() if anything is wrong.

View File

@ -19,19 +19,23 @@ class PathAction(enum.Enum):
class AssetAction:
def __init__(self):
"""All the info required to rewrite blend files and copy assets."""
def __init__(self) -> None:
self.path_action = PathAction.KEEP_PATH
self.usages = []
self.usages = [] # type: typing.List[result.BlockUsage]
"""BlockUsage objects referring to this asset.
Those BlockUsage objects could refer to data blocks in this blend file
(if the asset is a blend file) or in another blend file.
"""
self.new_path = None
self.new_path = None # type: pathlib.Path
"""Absolute path to the asset in the BAT Pack."""
self.rewrites = []
self.read_from = None # type: pathlib.Path
self.rewrites = [] # type: typing.List[result.BlockUsage]
"""BlockUsage objects in this asset that may require rewriting.
Empty list if this AssetAction is not for a blend file.
@ -43,7 +47,7 @@ class Packer:
blendfile: pathlib.Path,
project: pathlib.Path,
target: pathlib.Path,
noop=False):
noop=False) -> None:
self.blendfile = blendfile
self.project = project
self.target = target
@ -55,7 +59,8 @@ class Packer:
log.warning('Running in no-op mode, only showing what will be done.')
# Filled by strategise()
self._actions = collections.defaultdict(AssetAction)
self._actions = collections.defaultdict(
AssetAction) # type: typing.DefaultDict[pathlib.Path, AssetAction]
# Number of files we would copy, if not for --noop
self._file_count = 0

View File

@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
class FileCopyError(IOError):
"""Raised when one or more files could not be copied."""
def __init__(self, message, files_not_copied: typing.List[pathlib.Path]):
def __init__(self, message, files_not_copied: typing.List[pathlib.Path]) -> None:
super().__init__(message)
self.files_not_copied = files_not_copied
@ -19,7 +19,7 @@ class FileCopyError(IOError):
class FileCopier(threading.Thread):
"""Copies files in directory order."""
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# For copying in a different process. By using a priority queue the files
@ -31,7 +31,8 @@ class FileCopier(threading.Thread):
# maxsize=100 is just a guess as to a reasonable upper limit. When this limit
# is reached, the main thread will simply block while waiting for this thread
# to finish copying a file.
self.file_copy_queue = queue.PriorityQueue(maxsize=100)
self.file_copy_queue = queue.PriorityQueue(
maxsize=100) # type: queue.PriorityQueue[typing.Tuple[pathlib.Path, pathlib.Path]]
self.file_copy_done = threading.Event()
def queue(self, src: pathlib.Path, dst: pathlib.Path):

View File

@ -13,8 +13,8 @@ from . import result, modifier_walkers
log = logging.getLogger(__name__)
_warned_about_types = set()
_funcs_for_code = {}
_warned_about_types = set() # type: typing.Set[bytes]
_funcs_for_code = {} # type: typing.Dict[bytes, typing.Callable]
def iter_assets(block: blendfile.BlendFileBlock) -> typing.Iterator[result.BlockUsage]:

View File

@ -11,7 +11,7 @@ from blender_asset_tracer.blendfile import iterators
# Don't warn about these types at all.
_warned_about_types = {b'LI', b'DATA'}
_funcs_for_code = {}
_funcs_for_code = {} # type: typing.Dict[bytes, typing.Callable]
log = logging.getLogger(__name__)

View File

@ -7,12 +7,13 @@ blend files.
"""
import collections
import logging
import pathlib
import typing
from blender_asset_tracer import blendfile, bpathlib
from . import expanders
_funcs_for_code = {}
_funcs_for_code = {} # type: typing.Dict[bytes, typing.Callable]
log = logging.getLogger(__name__)
@ -23,16 +24,16 @@ class _BlockIterator:
without having to pass those variables to each recursive call.
"""
def __init__(self):
def __init__(self) -> None:
# Set of (blend file Path, block address) of already-reported blocks.
self.blocks_yielded = set()
self.blocks_yielded = set() # type: typing.Set[typing.Tuple[pathlib.Path, int]]
# Queue of blocks to visit
self.to_visit = collections.deque()
self.to_visit = collections.deque() # type: typing.Deque[blendfile.BlendFileBlock]
def iter_blocks(self,
bfile: blendfile.BlendFile,
limit_to: typing.Set[blendfile.BlendFileBlock] = frozenset(),
limit_to: typing.Set[blendfile.BlendFileBlock] = set(),
) -> typing.Iterator[blendfile.BlendFileBlock]:
"""Expand blocks with dependencies from other libraries."""

View File

@ -8,7 +8,7 @@ log = logging.getLogger(__name__)
class DoesNotExist(OSError):
"""Indicates a path does not exist on the filesystem."""
def __init__(self, path: pathlib.Path):
def __init__(self, path: pathlib.Path) -> None:
super().__init__(path)
self.path = path

View File

@ -53,7 +53,8 @@ def _modifier_particle_system(modifier: blendfile.BlendFileBlock, block_name: by
if flag & cdefs.PTCACHE_EXTERNAL:
path, field = pointcache.get(b'path', return_field=True)
yield result.BlockUsage(pointcache, path, path_full_field=field,
bpath = bpathlib.BlendPath(path)
yield result.BlockUsage(pointcache, bpath, path_full_field=field,
is_sequence=True, block_name=block_name)

View File

@ -40,8 +40,8 @@ class BlockUsage:
path_full_field: dna.Field = None,
path_dir_field: dna.Field = None,
path_base_field: dna.Field = None,
block_name: bytes = '',
):
block_name: bytes = b'',
) -> None:
if block_name:
self.block_name = block_name
else:
@ -69,7 +69,9 @@ class BlockUsage:
self.path_full_field = path_full_field
self.path_dir_field = path_dir_field
self.path_base_field = path_base_field
self._abspath = None # cached by __fspath__()
# cached by __fspath__()
self._abspath = None # type: pathlib.Path
@staticmethod
def guess_block_name(block: blendfile.BlendFileBlock) -> bytes:
@ -134,7 +136,7 @@ class BlockUsage:
raise NotImplemented()
return self.block_name < other.block_name and self.block < other.block
def __eq__(self, other: 'BlockUsage'):
def __eq__(self, other: object):
if not isinstance(other, BlockUsage):
return False
return self.block_name == other.block_name and self.block == other.block

View File

@ -3,3 +3,9 @@ addopts = -v --cov blender_asset_tracer --cov-report term-missing
[pep8]
max-line-length = 100
[mypy]
# matches the latest Blender release
python_version = 3.5
warn_redundant_casts = True