This commit is contained in:
ddidderr 2023-08-09 21:23:00 +02:00
commit d1f4c5733c
Signed by: ddidderr
GPG Key ID: 3841F1C27E6F0E14
4 changed files with 188 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

69
Cargo.lock generated Normal file
View File

@ -0,0 +1,69 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "libc"
version = "0.2.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
"static_assertions",
]
[[package]]
name = "parallel-copy"
version = "0.1.0"
dependencies = [
"nix",
]
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"

15
Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "parallel-copy"
version = "0.1.0"
edition = "2021"
[dependencies]
nix = "0.26"
[profile.release]
lto = true
debug = false
strip = true
panic = "unwind"
codegen-units = 1

103
src/main.rs Normal file
View File

@ -0,0 +1,103 @@
use nix::unistd::ftruncate;
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom, Write};
use std::thread;
use std::time::Instant;
use std::{cmp, fs};
use std::{env, fs::File, os::fd::IntoRawFd};
const NUM_THREADS: i64 = 8;
fn truncate_dst(f: File, len: i64) -> nix::Result<()> {
ftruncate(f.into_raw_fd(), len)
}
fn copy_range(src: &mut File, dst: &mut File, offset: u64, len: u64, chunk_bytes: u64) {
src.seek(SeekFrom::Start(offset)).unwrap();
dst.seek(SeekFrom::Start(offset)).unwrap();
let end = cmp::min(offset + chunk_bytes, len);
// replace chunk_bytes with the actual nr of chunk_bytes
// which can be less if we are at the last chunk of src
let chunk_bytes = (end - offset) as usize;
let bufsz = cmp::min(1024 * 1024, chunk_bytes);
let mut buf = vec![0u8; bufsz];
let nr_of_full_reads = chunk_bytes / bufsz;
let bytes_for_last_read = chunk_bytes % bufsz;
// full reads
for _ in 0..nr_of_full_reads {
src.read_exact(&mut buf).unwrap();
dst.write_all(&buf).unwrap();
}
// last (partial) read
if bytes_for_last_read > 0 {
buf.resize(bytes_for_last_read, 0);
src.read_exact(&mut buf).unwrap();
dst.write_all(&buf).unwrap();
}
}
fn thread_copy_chunks(thread_nr: i64, src: &str, dst: &str, len: i64) {
let mut src = File::open(src).unwrap();
let mut dst = OpenOptions::new().write(true).open(dst).unwrap();
let chunk_bytes = len / (NUM_THREADS * 2);
let jump = NUM_THREADS * chunk_bytes;
let mut offset = thread_nr * chunk_bytes;
let mut loop_count = 0;
loop {
println!("t{thread_nr} l{loop_count} {offset} chunk={chunk_bytes}");
copy_range(
&mut src,
&mut dst,
offset as u64,
len as u64,
chunk_bytes as u64,
);
offset += jump;
if offset >= len {
break;
}
loop_count += 1;
}
}
fn main() {
let src = env::args().nth(1).unwrap();
let dst = env::args().nth(2).unwrap();
let len = fs::metadata(&src).unwrap().len();
if len > i64::MAX as u64 {
panic!("src len > i64::MAX");
}
{
let dst = File::create(&dst).unwrap();
truncate_dst(dst, len as i64).unwrap();
}
println!("Spawning {NUM_THREADS} threads to copy {src} to {dst} ({len} bytes)");
let now = Instant::now();
// create threads in scope so borrows from the main thread
// outlive all threads
thread::scope(|s| {
for thread_nr in 0..NUM_THREADS {
let src_ref = &src;
let dst_ref = &dst;
s.spawn(move || thread_copy_chunks(thread_nr, src_ref, dst_ref, len as i64));
}
});
println!("took {:.3}s", now.elapsed().as_secs_f64());
}