Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions python/ndstorage/ndtiff_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class NDTiffDataset(NDStorageBase, WritableNDStorageAPI):
"""

def __init__(self, dataset_path=None, file_io: NDTiffFileIO = BUILTIN_FILE_IO, summary_metadata=None,
name=None, writable=False, **kwargs):
name=None, writable=False, pixel_compression = 1, **kwargs):
"""
Provides access to an NDTiffStorage dataset,
either one currently being acquired or one on disk
Expand All @@ -44,10 +44,15 @@ def __init__(self, dataset_path=None, file_io: NDTiffFileIO = BUILTIN_FILE_IO, s

self.file_io = file_io
self._lock = threading.RLock()
self._put_image_lock = threading.Lock()
if writable:
self.major_version = MAJOR_VERSION
self.minor_version = MINOR_VERSION
self._index_file = None
if pixel_compression in [1,8]:
self._pixel_compression = pixel_compression
else:
raise ValueError("Compression scheme must be 1 (No) or 8 (zlib)")
if summary_metadata is not None or writable:
# this dataset is either:
# - a view of an active acquisition. Image data is being written by code on the Java side
Expand Down Expand Up @@ -165,10 +170,18 @@ def read_metadata(self, channel=None, z=None, time=None, position=None, row=None

return self._do_read_metadata(axes)

def put_image(self, coordinates, image, metadata):
def put_image(self, coordinates, image, metadata, pixel_compression = 0):
# wait for put_image to finish before calling it again.
self._put_image_lock.acquire()
if not self._writable:
raise RuntimeError("Cannot write to a read-only dataset")

if pixel_compression == 0:
pixel_compression = self._pixel_compression
elif not pixel_compression in [1,8]:
warnings.warn(f"Pixel compression {pixel_compression}: only 1 (no compression) and 8 (zlib) are supported. Using {self._pixel_compression}.")
pixel_compression = self._pixel_compression

# add to write pending images
self._write_pending_images[frozenset(coordinates.items())] = (image, metadata)

Expand All @@ -184,7 +197,7 @@ def put_image(self, coordinates, image, metadata):
filename = 'NDTiffStack.tif'
if self.name is not None:
filename = self.name + '_' + filename
self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata)
self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata, self._pixel_compression)
self.file_index += 1
# create the index file
self._index_file = open(os.path.join(self.path, "NDTiff.index"), "wb")
Expand All @@ -193,16 +206,17 @@ def put_image(self, coordinates, image, metadata):
filename = 'NDTiffStack_{}.tif'.format(self.file_index)
if self.name is not None:
filename = self.name + '_' + filename
self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata)
self.current_writer = SingleNDTiffWriter(self.path, filename, self._summary_metadata, self._pixel_compression)
self.file_index += 1

index_data_entry = self.current_writer.write_image(frozenset(coordinates.items()), image, metadata)
index_data_entry = self.current_writer.write_image(frozenset(coordinates.items()), image, metadata, pixel_compression = pixel_compression)
# create readers and update axes
self.add_index_entry(index_data_entry, new_image_updates=False)
# write the index to disk
self._index_file.write(index_data_entry.as_byte_buffer().getvalue())
# remove from pending images
del self._write_pending_images[frozenset(coordinates.items())]
self._put_image_lock.release()

def finish(self):
if self.current_writer is not None:
Expand Down Expand Up @@ -258,7 +272,11 @@ def add_index_entry(self, data, new_image_updates=True):
self.index[frozenset(image_coordinates.items())] = index_entry

if index_entry.filename not in self._readers_by_filename:
new_reader = SingleNDTiffReader(os.path.join(self.path, index_entry.filename), file_io=self.file_io)
# prevent new reader object when writing:
if self._writable and self.current_writer.filename.split(os.sep)[-1] == index_entry.filename:
new_reader = self.current_writer.reader
else:
new_reader = SingleNDTiffReader(os.path.join(self.path, index_entry.filename), file_io=self.file_io)
self._readers_by_filename[index_entry.filename] = new_reader
# Should be the same on every file so resetting them is fine
self.major_version, self.minor_version = new_reader.major_version, new_reader.minor_version
Expand Down
70 changes: 56 additions & 14 deletions python/ndstorage/ndtiff_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import time
import struct
import warnings
import mmap
import zlib
from collections import OrderedDict
from io import BytesIO
from .file_io import NDTiffFileIO, BUILTIN_FILE_IO
Expand Down Expand Up @@ -49,7 +51,7 @@

class SingleNDTiffWriter:

def __init__(self, directory, filename, summary_md):
def __init__(self, directory, filename, summary_md, pixel_compression = 1):
self.filename = os.path.join(directory, filename)
self.index_map = {}
self.next_ifd_offset_location = -1
Expand All @@ -59,10 +61,15 @@ def __init__(self, directory, filename, summary_md):
self.buffers = deque()
self.first_ifd = True

self.start_time = None
if pixel_compression in [1, 8]:
self.pixel_compression = pixel_compression
else:
raise ValueError("Invalid pixel compression, only 1 (no compression) and 8 (zlib) are supported")

self.start_time = None

os.makedirs(directory, exist_ok=True)
# pre-allocate the file
# pre-allocate the file
file_path = os.path.join(directory, filename)
with open(file_path, 'wb') as f:
f.seek(MAX_FILE_SIZE - 1)
Expand Down Expand Up @@ -132,12 +139,12 @@ def _write_null_offset_after_last_image(self):
self.file.write(buffer)
self.file.seek(current_pos)

def write_image(self, index_key, pixels, metadata, bit_depth='auto'):
def write_image(self, index_key, pixels, metadata, bit_depth='auto', pixel_compression = 0):
"""
Write an image to the file

Parameters
----------
----------
index_key : frozenset
The key to index the image
pixels : np.ndarray or bytearray
Expand All @@ -152,14 +159,25 @@ def write_image(self, index_key, pixels, metadata, bit_depth='auto'):
NDTiffIndexEntry
The index entry for the image
"""
if pixel_compression == 0:
pixel_compression = self.pixel_compression

image_height, image_width = pixels.shape
rgb = pixels.ndim == 3 and pixels.shape[2] == 3

if rgb and pixel_compression in [8]:
warnings.warn(f"Pixel compression {pixel_compression} is not supported for RGB images. Using no compression.")
pixel_compression = 1
if not pixel_compression in [1,8]:
warnings.warn(f"Invalid pixel compression {pixel_compression}: only 1 (no compression) and 8 (zlib) are supported. Using 1 (no compression).")
pixel_compression = 1

if bit_depth == 'auto':
bit_depth = 8 if pixels.dtype == np.uint8 else 16
# if metadata is a dict, serialize it to a json string and make it a utf8 byte buffer
if isinstance(metadata, dict):
metadata = self._get_bytes_from_string(json.dumps(metadata))
ied = self._write_ifd(index_key, pixels, metadata, rgb, image_height, image_width, bit_depth)
ied = self._write_ifd(index_key, pixels, metadata, rgb, image_height, image_width, bit_depth, pixel_compression)
while self.buffers:
self.file.write(self.buffers.popleft())
# make sure the file is flushed to disk
Expand All @@ -168,12 +186,26 @@ def write_image(self, index_key, pixels, metadata, bit_depth='auto'):
return ied


def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width, bit_depth):
def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width, bit_depth, pixel_compression):
if self.file.tell() % 2 == 1:
self.file.seek(self.file.tell() + 1) # Make IFD start on word

byte_depth = 1 if isinstance(pixels, bytearray) else 2
bytes_per_image_pixels = self._bytes_per_image_pixels(pixels, rgb)
if isinstance(pixels, bytearray):
byte_depth = 1
# if the pixel object is a numpy array, it is type of <class 'numpy.ndarray'>
# when using np_array.tobytes it is <class 'bytes'>
# therefore taking the the bit_depth information "pixels.dtype" into account
elif bit_depth == 8:
byte_depth = 1
else:
byte_depth = 2

if pixel_compression == 8:
compressed_pixels = zlib.compress(pixels)
bytes_per_image_pixels = len(compressed_pixels)
else:
bytes_per_image_pixels = self._bytes_per_image_pixels(pixels, rgb)

num_entries = 13

# 2 bytes for number of directory entries, 12 bytes per directory entry, 4 byte offset of next IFD
Expand Down Expand Up @@ -202,7 +234,7 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, HEIGHT, 4, 1, image_height, buffer_position)
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, BITS_PER_SAMPLE, 3, 3 if rgb else 1,
bits_per_sample_offset if rgb else byte_depth * 8, buffer_position)
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, COMPRESSION, 3, 1, 1, buffer_position)
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, COMPRESSION, 3, 1, pixel_compression, buffer_position)
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, PHOTOMETRIC_INTERPRETATION, 3, 1,
2 if rgb else 1, buffer_position)
buffer_position += self._write_ifd_entry(ifd_and_small_vals_buffer, STRIP_OFFSETS, 4, 1, pixel_data_offset,
Expand Down Expand Up @@ -235,7 +267,10 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width
buffer_position += 8

self.buffers.append(ifd_and_small_vals_buffer)
self.buffers.append(self._get_pixel_buffer(pixels, rgb))
if pixel_compression in [8]:
self.buffers.append(compressed_pixels)
else:
self.buffers.append(self._get_pixel_buffer(pixels, rgb))
self.buffers.append(metadata)

self.first_ifd = False
Expand All @@ -251,7 +286,7 @@ def _write_ifd(self, index_key, pixels, metadata, rgb, image_height, image_width
}.get(bit_depth, NDTiffIndexEntry.EIGHT_BIT_RGB if rgb else None)

return NDTiffIndexEntry(index_key, pixel_type, pixel_data_offset, image_width, image_height, metadata_offset,
len(metadata), self.filename.split(os.sep)[-1])
len(metadata), self.filename.split(os.sep)[-1], pixel_compression)

def _write_ifd_entry(self, buffer, tag, dtype, count, value, buffer_position):
struct.pack_into('<HHII', buffer, buffer_position, tag, dtype, count, value)
Expand Down Expand Up @@ -300,6 +335,8 @@ class SingleNDTiffReader:
FOURTEEN_BIT_MONOCHROME = 5
ELEVEN_BIT_MONOCHROME = 6

ZLIB_COMPRESSED = 8
NO_COMPRESSION = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant to UNCOMPRESSED. If NO_COMPRESSION is a standard name, maybe good to remove uncompressed

UNCOMPRESSED = 0

def __init__(self, tiff_path, file_io: NDTiffFileIO = BUILTIN_FILE_IO, summary_md=None):
Expand All @@ -314,6 +351,8 @@ def __init__(self, tiff_path, file_io: NDTiffFileIO = BUILTIN_FILE_IO, summary_m
self.file_io = file_io
self.tiff_path = tiff_path
self.file = self.file_io.open(tiff_path, "rb")
# mmap speeds up random access
self.mmap_file = mmap.mmap(self.file.fileno(), 0, prot=mmap.PROT_READ)
if summary_md is None:
self.summary_md, self.first_ifd_offset = self._read_header()
else:
Expand All @@ -323,6 +362,7 @@ def __init__(self, tiff_path, file_io: NDTiffFileIO = BUILTIN_FILE_IO, summary_m

def close(self):
""" """
self.mmap_file.close()
self.file.close()

def _read_header(self):
Expand Down Expand Up @@ -364,8 +404,8 @@ def _read(self, start, end):
"""
convert to python ints
"""
self.file.seek(int(start), 0)
return self.file.read(end - start)
self.mmap_file.seek(int(start), 0)
return self.mmap_file.read(end - start)

def read_metadata(self, index):
return json.loads(
Expand Down Expand Up @@ -393,6 +433,8 @@ def read_image(self, index_entry):
width = index_entry.image_width
height = index_entry.image_height
data = self._read(index_entry.pix_offset, index_entry.pix_offset + width * height * bytes_per_pixel)
if index_entry.pixel_compression == self.ZLIB_COMPRESSED:
data = zlib.decompress(data)
pixels = np.frombuffer(data, dtype=dtype)
image = pixels.reshape([height, width, 3] if bytes_per_pixel == 3 else [height, width])
return image
8 changes: 5 additions & 3 deletions python/ndstorage/ndtiff_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ class NDTiffIndexEntry:
FOURTEEN_BIT = 5
ELEVEN_BIT = 6

ZLIB_COMPRESSED = 8
NO_COMPRESSION = 1
UNCOMPRESSED = 0

def __init__(self, axes_key, pixel_type, pix_offset, image_width, image_height, md_offset, md_length, filename):
def __init__(self, axes_key, pixel_type, pix_offset, image_width, image_height, md_offset, md_length, filename, pixel_compression):
self.axes_key = axes_key
self.pix_offset = pix_offset
self.image_width = image_width
self.image_height = image_height
self.metadata_length = md_length
self.metadata_offset = md_offset
self.pixel_type = pixel_type
self.pixel_compression = self.UNCOMPRESSED
self.pixel_compression = pixel_compression
self.metadata_compression = self.UNCOMPRESSED
self.filename = filename
self.data_set_finished_entry = axes_key is None
Expand Down Expand Up @@ -110,7 +112,7 @@ def unpack_single_index_entry(data, position=0):
metadata_offset, metadata_length, metadata_compression = \
struct.unpack("IIIIIIII", data[position: position + 32])
index_entry = NDTiffIndexEntry(axes.items, pixel_type, pixel_offset, image_width, image_height,
metadata_offset, metadata_length, filename)
metadata_offset, metadata_length, filename, pixel_compression)
position += 32
return position, axes, index_entry

Expand Down
80 changes: 80 additions & 0 deletions python/ndstorage/test/writing_multithreaded_zlib_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import numpy as np
import os
import shutil
import threading
import time
import sys
from collections import deque
#from ..ndtiff_file import SingleNDTiffWriter, SingleNDTiffReader
from ..ndtiff_dataset import NDTiffDataset
#from ..ndram_dataset import NDRAMDataset
import pytest

@pytest.fixture(scope="function")
def test_data_path(tmp_path_factory):
data_path = tmp_path_factory.mktemp('writer_tests')
for f in os.listdir(data_path):
os.remove(os.path.join(data_path, f))
yield str(data_path)
shutil.rmtree(data_path)

# loop for threaded writing
@pytest.mark.skip(reason="loop for threaded writing")
def image_write_loop(my_deque: deque, dataset: NDTiffDataset, run_event: threading.Event):
while run_event.is_set() or len(my_deque) != 0:
try:
if my_deque:
current_time, pixels = my_deque.popleft()
axes = {'time': current_time}
dataset.put_image(axes, pixels, {'time_metadata': current_time})
else:
time.sleep(0.001)
except IndexError:
break

#@pytest.mark.skipif(sys.version_info < (3, 13), reason="For test_write_multithreaded_zlib Python >= 3.13 is recommended")
def test_write_multithreaded_zlib(test_data_path):
"""
Write an NDTiff dataset and read it back in, testing pixels and metadata
"""
#assert sys.version_info[0] >= 3, "For test_write_multithreaded_zlib Python >= 3.13 is recommended"
#assert sys.version_info[1] >= 13, "For test_write_multithreaded_zlib Python >= 3.13 is recommended"

full_path = os.path.join(test_data_path, 'test_write_full_dataset')
dataset = NDTiffDataset(full_path, summary_metadata={}, writable=True, pixel_compression=8)
image_deque = deque()
run_event = threading.Event()
run_event.set()

image_height = 256
image_width = 256

thread = threading.Thread(target=image_write_loop, args=(image_deque, dataset, run_event))
thread.start()

time_counter = 0
time_limit = 10

while True:
if len(image_deque) < 4:
pixels = np.ones(image_height * image_width, dtype=np.uint16).reshape((image_height, image_width)) * time_counter
image_deque.append((time_counter, pixels))
time_counter += 1
if time_counter >= time_limit:
break
else:
time.sleep(0.001) # if the deque is full, wait a bit

run_event.clear()
thread.join()
dataset.finish()

# read the file back in
dataset = NDTiffDataset(full_path)
for time_index in range(time_limit):
pixels = np.ones(image_height * image_width, dtype=np.uint16).reshape((image_height, image_width)) * time_index
axes = {'time': time_index}
read_image = dataset.read_image(**axes)
assert np.all(read_image == pixels)
assert dataset.read_metadata(**axes) == {'time_metadata': time_index}