Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,9 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
@staticmethod
CResult[shared_ptr[COutputStream]] Open(const c_string& path)

@staticmethod
CResult[shared_ptr[COutputStream]] Open(int fd)

@staticmethod
CResult[shared_ptr[COutputStream]] OpenWithAppend" Open"(
const c_string& path, c_bool append)
Expand All @@ -1687,6 +1690,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
@staticmethod
CResult[shared_ptr[ReadableFile]] Open(const c_string& path)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(int fd)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(int fd, CMemoryPool* memory_pool)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(const c_string& path,
CMemoryPool* memory_pool)
Expand Down
64 changes: 53 additions & 11 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,11 @@ cdef class OSFile(NativeFile):
"""
A stream backed by a regular file descriptor.

Parameters
----------
path : str or int
A file path or an open file descriptor.

Examples
--------
Create a new file to write to:
Expand Down Expand Up @@ -1228,22 +1233,33 @@ cdef class OSFile(NativeFile):
object path

def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
_check_is_file(path)
self.path = path

cdef:
FileMode c_mode
shared_ptr[Readable] handle
c_string c_path = encode_file_path(path)

if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
self._open_writable(c_path)
elif mode in ('a', 'ab'):
self._open_writable(c_path, append=True)
c_string c_path
int fd

if isinstance(path, int):
fd = path
if mode in ('r', 'rb'):
self._open_readable_fd(fd, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb', 'a', 'ab'):
self._open_writable_fd(fd, append=(mode in ('a', 'ab')))
else:
raise ValueError(f'Invalid file mode: {mode}')
else:
raise ValueError(f'Invalid file mode: {mode}')
_check_is_file(path)
c_path = encode_file_path(path)
if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
self._open_writable(c_path)
elif mode in ('a', 'ab'):
self._open_writable(c_path, append=True)
else:
raise ValueError(f'Invalid file mode: {mode}')

cdef _open_readable(self, c_string path, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle
Expand All @@ -1262,9 +1278,35 @@ cdef class OSFile(NativeFile):
self.is_writable = True
self._is_appending = append

cdef _open_readable_fd(self, int fd, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle

with nogil:
handle = GetResultValue(ReadableFile.Open(fd, pool))

self.is_readable = True
self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)

cdef _open_writable_fd(self, int fd, c_bool append=False):
with nogil:
self.output_stream = GetResultValue(FileOutputStream.Open(fd))
self.is_writable = True
self._is_appending = append

def fileno(self):
self._assert_open()
return self.handle.file_descriptor()
cdef:
shared_ptr[ReadableFile] readable_handle
shared_ptr[FileOutputStream] writable_handle

if self.is_readable:
readable_handle = static_pointer_cast[ReadableFile, CRandomAccessFile](
self.get_random_access_file())
return readable_handle.get().file_descriptor()
else:
writable_handle = static_pointer_cast[FileOutputStream, COutputStream](
self.get_output_stream())
return writable_handle.get().file_descriptor()


cdef class FixedSizeBufferWriter(NativeFile):
Expand Down
17 changes: 17 additions & 0 deletions python/pyarrow/tests/parquet/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ def test_memory_map(tempdir):
assert table_read.equals(table)


def test_parquet_read_write_table_raw_fd(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = str(tempdir / 'raw-fd.parquet')
binary_flag = getattr(os, "O_BINARY", 0)

fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag,
0o666)
with pa.OSFile(fd, mode='wb') as sink:
pq.write_table(table, sink)

fd = os.open(path, os.O_RDONLY | binary_flag)
with pa.OSFile(fd, mode='rb') as source:
result = pq.read_table(source)

assert result.equals(table)


@pytest.mark.pandas
def test_enable_buffered_stream(tempdir):
df = alltypes_sample(size=10)
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import bz2
from contextlib import contextmanager
import errno
from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
import itertools
import gc
Expand Down Expand Up @@ -1280,6 +1281,30 @@ def test_os_file_writer(tmpdir):
assert f5.size() == 6 # foo + bar


def test_os_file_raw_fd(tmpdir):
path = os.path.join(str(tmpdir), guid())
binary_flag = getattr(os, "O_BINARY", 0)

fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag,
0o666)
with pa.OSFile(fd, mode='wb') as f:
assert f.fileno() == fd
f.write(b'foo')

with pytest.raises(OSError) as exc:
os.fstat(fd)
assert exc.value.errno == errno.EBADF

fd = os.open(path, os.O_RDONLY | binary_flag)
with pa.OSFile(fd, mode='rb') as f:
assert f.fileno() == fd
assert f.read() == b'foo'

with pytest.raises(OSError) as exc:
os.fstat(fd)
assert exc.value.errno == errno.EBADF


def test_native_file_write_reject_unicode():
# ARROW-3227
nf = pa.BufferOutputStream()
Expand Down
Loading