diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index e96a7d84696d..8ee7784461c6 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1677,6 +1677,9 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: @staticmethod CResult[shared_ptr[COutputStream]] Open(const c_string& path) + @staticmethod + CResult[shared_ptr[COutputStream]] Open(int fd) + @staticmethod CResult[shared_ptr[COutputStream]] OpenWithAppend" Open"( const c_string& path, c_bool append) @@ -1687,6 +1690,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: @staticmethod CResult[shared_ptr[ReadableFile]] Open(const c_string& path) + @staticmethod + CResult[shared_ptr[ReadableFile]] Open(int fd) + + @staticmethod + CResult[shared_ptr[ReadableFile]] Open(int fd, CMemoryPool* memory_pool) + @staticmethod CResult[shared_ptr[ReadableFile]] Open(const c_string& path, CMemoryPool* memory_pool) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index fd2d4df42ccd..cdd1f36626ad 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1183,6 +1183,11 @@ cdef class OSFile(NativeFile): """ A stream backed by a regular file descriptor. + Parameters + ---------- + path : str or int + A file path or an open file descriptor. + Examples -------- Create a new file to write to: @@ -1228,22 +1233,33 @@ cdef class OSFile(NativeFile): object path def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): - _check_is_file(path) self.path = path cdef: FileMode c_mode shared_ptr[Readable] handle - c_string c_path = encode_file_path(path) - - if mode in ('r', 'rb'): - self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) - elif mode in ('w', 'wb'): - self._open_writable(c_path) - elif mode in ('a', 'ab'): - self._open_writable(c_path, append=True) + c_string c_path + int fd + + if isinstance(path, int): + fd = path + if mode in ('r', 'rb'): + self._open_readable_fd(fd, maybe_unbox_memory_pool(memory_pool)) + elif mode in ('w', 'wb', 'a', 'ab'): + self._open_writable_fd(fd, append=(mode in ('a', 'ab'))) + else: + raise ValueError(f'Invalid file mode: {mode}') else: - raise ValueError(f'Invalid file mode: {mode}') + _check_is_file(path) + c_path = encode_file_path(path) + if mode in ('r', 'rb'): + self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) + elif mode in ('w', 'wb'): + self._open_writable(c_path) + elif mode in ('a', 'ab'): + self._open_writable(c_path, append=True) + else: + raise ValueError(f'Invalid file mode: {mode}') cdef _open_readable(self, c_string path, CMemoryPool* pool): cdef shared_ptr[ReadableFile] handle @@ -1262,9 +1278,35 @@ cdef class OSFile(NativeFile): self.is_writable = True self._is_appending = append + cdef _open_readable_fd(self, int fd, CMemoryPool* pool): + cdef shared_ptr[ReadableFile] handle + + with nogil: + handle = GetResultValue(ReadableFile.Open(fd, pool)) + + self.is_readable = True + self.set_random_access_file( handle) + + cdef _open_writable_fd(self, int fd, c_bool append=False): + with nogil: + self.output_stream = GetResultValue(FileOutputStream.Open(fd)) + self.is_writable = True + self._is_appending = append + def fileno(self): self._assert_open() - return self.handle.file_descriptor() + cdef: + shared_ptr[ReadableFile] readable_handle + shared_ptr[FileOutputStream] writable_handle + + if self.is_readable: + readable_handle = static_pointer_cast[ReadableFile, CRandomAccessFile]( + self.get_random_access_file()) + return readable_handle.get().file_descriptor() + else: + writable_handle = static_pointer_cast[FileOutputStream, COutputStream]( + self.get_output_stream()) + return writable_handle.get().file_descriptor() cdef class FixedSizeBufferWriter(NativeFile): diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 4f26f22b100d..01c84b4f3513 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -133,6 +133,23 @@ def test_memory_map(tempdir): assert table_read.equals(table) +def test_parquet_read_write_table_raw_fd(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = str(tempdir / 'raw-fd.parquet') + binary_flag = getattr(os, "O_BINARY", 0) + + fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag, + 0o666) + with pa.OSFile(fd, mode='wb') as sink: + pq.write_table(table, sink) + + fd = os.open(path, os.O_RDONLY | binary_flag) + with pa.OSFile(fd, mode='rb') as source: + result = pq.read_table(source) + + assert result.equals(table) + + @pytest.mark.pandas def test_enable_buffered_stream(tempdir): df = alltypes_sample(size=10) diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index a6d3546e57c6..824c69c98982 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -17,6 +17,7 @@ import bz2 from contextlib import contextmanager +import errno from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase) import itertools import gc @@ -1280,6 +1281,30 @@ def test_os_file_writer(tmpdir): assert f5.size() == 6 # foo + bar +def test_os_file_raw_fd(tmpdir): + path = os.path.join(str(tmpdir), guid()) + binary_flag = getattr(os, "O_BINARY", 0) + + fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag, + 0o666) + with pa.OSFile(fd, mode='wb') as f: + assert f.fileno() == fd + f.write(b'foo') + + with pytest.raises(OSError) as exc: + os.fstat(fd) + assert exc.value.errno == errno.EBADF + + fd = os.open(path, os.O_RDONLY | binary_flag) + with pa.OSFile(fd, mode='rb') as f: + assert f.fileno() == fd + assert f.read() == b'foo' + + with pytest.raises(OSError) as exc: + os.fstat(fd) + assert exc.value.errno == errno.EBADF + + def test_native_file_write_reject_unicode(): # ARROW-3227 nf = pa.BufferOutputStream()