Conversation
src/engine.rs
Outdated
| new_block_flags.push(false); | ||
| } | ||
| } | ||
| let mut a_list: Vec<aiocb> = Vec::with_capacity(block_sum); |
There was a problem hiding this comment.
Hide aiocb inside AioContext (it holds Vec<aiocb> and create new ones when a new read request is issued via fs::read_async). No one other than the DefaultFileSystem should have access to aiocb.
There was a problem hiding this comment.
Do you mean to use one AioContext to manage all read requests? Then I need to redesign the AioContext, like this
pub struct AioContext{
fd_vec: Vec<Arc<LogFd>>,
aio_vec: Vec<aiocb>,
buf_vec: Vec<Vec<u8>>,
}
There was a problem hiding this comment.
I'm not sure if there's a performance difference between waiting on each aiocb separately, or waiting all of them in one single syscall (maybe you can benchmark it too).
If there's no difference, then it makes sense to create new AioContext for each request. But either way, all the details must be put inside the file system implementation. E.g.
impl AioContext {
pub fn wait() -> Result<()>;
// UB if `wait()` is not called and returns `Ok(())`.
pub fn data() -> &[u8];
}
76f4403 to
193b92e
Compare
src/env/default.rs
Outdated
| } | ||
|
|
||
| pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { | ||
| let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); |
There was a problem hiding this comment.
The lock is meaningless. The mutable reference (&mut AioContext) ensures there's only one thread accessing the ctx.
Of course, due to the unsafe block, the buf_vec is in fact "leaked" to aio code. That's why we must manually guarantee there's no one reading or writing to buf_vec after calling aio_read.
src/env/default.rs
Outdated
|
|
||
| impl AsyncContext for AioContext { | ||
| fn single_wait(&mut self, seq: usize) -> IoResult<usize> { | ||
| let buf_len = self.buf_vec[seq].lock().unwrap().len(); |
There was a problem hiding this comment.
ditto, the &mut self has the same effect.
src/env/default.rs
Outdated
| &self, | ||
| handle: Arc<Self::Handle>, | ||
| seq: usize, | ||
| ctx: &mut AioContext, |
There was a problem hiding this comment.
AioContext should be a generic type, different file system implementation can have different context.
The code will look like:
trait WaitData {
fn wait(&mut self, index: usize) -> &[u8];
}
trait FileSystem {
type AsyncContext: WaitData;
fn new_async_context(&self) -> Self::AsyncContext;
}
impl FileSystem for DefaultFileSystem {
type AsyncContext = AioContext;
fn new_async_context(&self) {
AioContext::new()
}
}
src/env/mod.rs
Outdated
| &self, | ||
| handle: Arc<Self::Handle>, | ||
| seq: usize, | ||
| ctx: &mut AioContext, |
There was a problem hiding this comment.
You are still leaking details of ctx. Here you should emulate the use of Writer:
type AsyncContext: AioContext;
fn read_async(&self, handle, ctx: &mut Self::AsyncContext);
fn new_async_context(&self) -> Self::AsyncContext;
tabokie
left a comment
There was a problem hiding this comment.
- Please run the checks locally before submitting a commit, as a contributor the basic level of respect is to pass CI before requesting reviews.
- A good PR description should contain three parts: the problem, the solution, the verification method (test plan).
| } | ||
| } | ||
|
|
||
| pub struct AioContext { |
There was a problem hiding this comment.
Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.
src/pipe_log.rs
Outdated
| ) -> Result<()>; | ||
|
|
||
| /// Reads bytes from multi blocks using 'Async IO'. | ||
| fn async_read_bytes( |
There was a problem hiding this comment.
There's no need to create a function that isn't used anywhere. Now, you need to make fetch_entries_to_aio call this function and remove async_entry_read, because (1) concept of entry should not be exposed to pipe log (2) we need the raw bytes to populate block cache (this part can be implemented later in a different PR maybe)
There was a problem hiding this comment.
@tabokie Now my implementation idea is to read all bytes through async_entry_read, and then parse them in turn. like
pub fn fetch_entries_to_aio(){
...
let bytes = async_entry_read();
for (idx,i) in ents_idx.iter().enumerate(){
entry = parse_from_bytes(byte[idx]);
vec.push(entry);
}
...
}
Does it ok?
There was a problem hiding this comment.
Yes, that's what I meant. But the name should not be async_entry_read, the underlying pipe is not aware of the "entry" concept.
src/env/default.rs
Outdated
| } | ||
| } | ||
|
|
||
| impl AsyncContext for AioContext { |
There was a problem hiding this comment.
The current implementation wouldn't work with custom file system such as ObfuscatedFileSystem. If you add a test that reads async with obfuscated fs, the result would be wrong.
Also, another minor detail is, it's not intuitive to have async context be used both passively and actively. i.e. fs::new_reader(ctx, handle) should not coexist with ctx::wait().
Let's do this instead:
pub trait FileSystem {
pub type AsyncContext;
fn async_read(&self, ctx: &mut Self::AsyncContext, handle: Arc<Self::Handle>, block: FileBlockHandle) -> Result<()>;
fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>;
}
// for obfuscated.rs
pub struct ObfuscatedContext(<DefaultFileSystem as FileSystem>::AsyncIoContext);
impl FileSystem for ObfuscatedFileSystem {
fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>> {
let base = self.0.async_finish(ctx.0)?;
for v in &mut base {
// do obfuscation
for c in v {
c.wrapping_sub(1);
}
}
}
}There was a problem hiding this comment.
@tabokie Whether it is necessary to consider such situations? That is in an fetch_entries_to_aio call, some handles belong to the Append queue and others belong to the Rewrite queue. They correspond to different files_ system, then you need to call 2 times async_finish().In more complex cases, they are interspersed, like Append, Rewrite, Append, Rewrite, Rewrite...,then the design of async_finish() will doesn't work.
There was a problem hiding this comment.
Like this:
impl DualPipes {
fn read_async(&self, handls: Vec<FileBlockHandle>) ->Vec<Vec<u8>> {
let mut ctx = fs.new_context();
for handle in handles {
fs.read_async(&mut ctx, handle);
}
fs.async_finish(ctx);
}
}
There was a problem hiding this comment.
@tabokie How can I determine which fs to use? Use self.pipes[LogQueue::Append].file_system, or self.pipes[LogQueue::Rewrite].file_system, or both?
There was a problem hiding this comment.
Either one is fine. They are always the same.
src/env/default.rs
Outdated
| fn single_wait(&mut self, seq: usize) -> IoResult<usize> { | ||
| let buf_len = self.buf_vec[seq].len(); | ||
| unsafe { | ||
| loop { |
There was a problem hiding this comment.
What if the read hits EOF? It will loop forever? This needs testing as well.
Signed-off-by: root <root@WxyR9P.localdomain> Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <root@WxyR9P.localdomain> Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <root@WxyR9P.localdomain> Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <root@WxyR9P.localdomain> Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <1019495690@qq.com>
|
I have completed the code modification according to your comments and run the unit test locally, all of them have passed. PTAL. @tabokie |
|
@ustc-wxy format and clippy failed. |
Signed-off-by: root <1019495690@qq.com>
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #286 +/- ##
==========================================
- Coverage 97.74% 97.40% -0.34%
==========================================
Files 30 30
Lines 11287 11668 +381
==========================================
+ Hits 11032 11365 +333
- Misses 255 303 +48
... and 1 file with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
src/pipe_log.rs
Outdated
| fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>; | ||
|
|
||
| /// Reads bytes from multi blocks using 'Async IO'. | ||
| fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>>; |
There was a problem hiding this comment.
As mentioned before, pipe is not aware of the "entry" concept. Here should use Vec<FileBlockHandle>.
src/file_pipe_log/pipe.rs
Outdated
| self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); | ||
| } | ||
| LogQueue::Rewrite => { | ||
| self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx); |
There was a problem hiding this comment.
Change the function to async_read(ctx, buf). It's the common order to pass in a context.
There was a problem hiding this comment.
@tabokie Did you mean async_read(ctx, blocks)? Like
impl<F: FileSystem> PipeLog for DualPipes<F> {
fn async_read_bytes(blocks:Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>>{
...
self.pipes[LogQueue::Append].async_read(ctx,blocks);
let bytes = fs.async_finish(ctx);
...
}
}
src/file_pipe_log/pipe.rs
Outdated
| } | ||
| } | ||
| } | ||
| let res = fs.async_finish(&mut ctx).unwrap(); |
There was a problem hiding this comment.
Change it to async_finish(ctx: Context) (instead of async_finish(ctx: &mut Context)). This makes sure the context isn't reused afterwards.
src/file_pipe_log/pipe.rs
Outdated
|
|
||
| #[inline] | ||
| fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>> { | ||
| let mut blocks: Vec<FileBlockHandle> = vec![]; |
There was a problem hiding this comment.
Won't this compile? let mut blocks = Vec::new();
src/file_pipe_log/pipe.rs
Outdated
| reader.read(handle) | ||
| } | ||
|
|
||
| fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { |
There was a problem hiding this comment.
There's no need for the block to be mutable.
src/file_pipe_log/pipe.rs
Outdated
| let fd = self.get_fd(block.id.seq).unwrap(); | ||
| let buf = vec![0_u8; block.len]; | ||
|
|
||
| self.file_system.async_read(ctx, fd, buf, block).unwrap(); |
There was a problem hiding this comment.
Why not creating the vector inside async_read? So there's one less argument to pass into (and validate).
src/env/obfuscated.rs
Outdated
| type Handle = <DefaultFileSystem as FileSystem>::Handle; | ||
| type Reader = ObfuscatedReader; | ||
| type Writer = ObfuscatedWriter; | ||
| type AsyncIoContext = ObfuscatedContext; |
There was a problem hiding this comment.
Why? Doesn't this work already?
type AsyncIoContext = <DefaultFileSystem as FileSystem>::AsyncIoContext
There was a problem hiding this comment.
@tabokie I introduced the ObfuscatedContext struct based on your comments:#286 (comment)
There was a problem hiding this comment.
Adding a layer of wrapper is only useful if you have implemented something on top of the base context. Right now you didn't implement anything, so there is no need to wrap it.
src/env/obfuscated.rs
Outdated
| fn wait(&mut self) -> IoResult<usize> { | ||
| self.0.wait() | ||
| } |
There was a problem hiding this comment.
These codes are not covered, meaning you didn't test ObfuscatedFileSystem in unit tests.
src/env/default.rs
Outdated
| } | ||
| } | ||
|
|
||
| pub fn set_fd(&mut self, fd: Arc<LogFd>) { |
There was a problem hiding this comment.
I understand this is to hide the LogFd. But it isn't necessary if you treat AsyncContext as a data-only struct (inline all code into file system, and remove the trait AsyncContext entirely). i.e.
impl SomeFileSystem {
fn async_read(ctx: &mut Self::Context, handle: Self::Handle, block: FileBlockHandle) {
handle.submit_async_read(ctx.buf[i], ...)?;
}
}
src/env/default.rs
Outdated
| for _ in 0..block_sum { | ||
| aio_vec.push(mem::zeroed::<libc::aiocb>()); | ||
| } |
There was a problem hiding this comment.
Why creating them before hand, instead of creating them the same time as buf_vec?
There was a problem hiding this comment.
@tabokie If libc:: aiocb is created in multiple function calls, the compiler will assign the same address to the pointer every time, which will leads incorrect result. I'm not so familiar with the memory allocate API of trust, do you have any other good methods?
There was a problem hiding this comment.
I don't think it's possible to have multiple variables with the same address. It's possible that you mistakenly free some of them.
Signed-off-by: root <1019495690@qq.com>
Signed-off-by: root <1019495690@qq.com>
|
@tabokie PTAL |
Signed-off-by: root <1019495690@qq.com>
tabokie
left a comment
There was a problem hiding this comment.
#286 (comment) and #286 (comment) are not addressed.
| } | ||
| Ok(files[(file_seq - files[0].seq) as usize].handle.clone()) | ||
| } | ||
|
|
src/file_pipe_log/pipe.rs
Outdated
| reader.read(handle) | ||
| } | ||
|
|
||
| fn async_read(&self, blocks: Vec<FileBlockHandle>, ctx: &mut F::AsyncIoContext) { |
src/file_pipe_log/pipe.rs
Outdated
| let fd = self.get_fd(block.id.seq).unwrap(); | ||
| self.file_system.async_read(ctx, fd, block).unwrap(); |
src/env/obfuscated.rs
Outdated
| let mut res = vec![]; | ||
| for v in base { | ||
| let mut temp = vec![]; | ||
| //do obfuscation. |
There was a problem hiding this comment.
| //do obfuscation. | |
| // do obfuscation. |
src/env/obfuscated.rs
Outdated
| let base = self.inner.async_finish(ctx).unwrap(); | ||
| let mut res = vec![]; | ||
| for v in base { | ||
| let mut temp = vec![]; |
There was a problem hiding this comment.
I don't know why you need to create another vector instead of modifying base directly.
src/env/mod.rs
Outdated
| mod default; | ||
| mod obfuscated; | ||
|
|
||
| pub use default::AioContext; |
Signed-off-by: root <1019495690@qq.com>
|
@tabokie PTAL. |
|
#286 (comment) is still not addressed. Is there any confusion?
|
Signed-off-by: root <1019495690@qq.com>
|
@tabokie PTAL, thx! |
src/engine.rs
Outdated
| Ok(0) | ||
| } | ||
|
|
||
| pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( |
There was a problem hiding this comment.
| pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | |
| pub fn fetch_entries_to_aio<M: MessageExt>( |
src/env/mod.rs
Outdated
| type Writer: Seek + Write + Send + WriteExt; | ||
| type AsyncIoContext; | ||
|
|
||
| fn async_read( |
There was a problem hiding this comment.
Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read.
src/env/mod.rs
Outdated
| type Handle: Send + Sync + Handle; | ||
| type Reader: Seek + Read + Send; | ||
| type Writer: Seek + Write + Send + WriteExt; | ||
| type AsyncIoContext; |
src/engine.rs
Outdated
| } | ||
| fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>( |
There was a problem hiding this comment.
Add a newline between functions.
src/engine.rs
Outdated
| pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | ||
| &self, | ||
| region_id: u64, | ||
| begin: u64, | ||
| end: u64, | ||
| max_size: Option<usize>, | ||
| vec: &mut Vec<M::Entry>, | ||
| ) -> Result<usize> { |
There was a problem hiding this comment.
I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something.
One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it).
Signed-off-by: root <1019495690@qq.com>
What is changed and how it works?
Issue Number: Ref #248
What's Changed:
1.Add fetch_ entries_aio() interface, which read entries using AIO.Solution:
Manage single entry-read requests in batch through AsyncIOContext. After submit read request according to different file blocks, fetch the byte stream collection of all blocks from AsyncIOContext. In the end, parse them into Entry in turn.
Check List
Tests