From d1f4c5733cea31f9c3bd803f2d4c6c074a336744 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Wed, 9 Aug 2023 21:23:00 +0200 Subject: [PATCH] init --- .gitignore | 1 + Cargo.lock | 69 +++++++++++++++++++++++++++++++++++ Cargo.toml | 15 ++++++++ src/main.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..a56854b --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,69 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "libc" +version = "0.2.147" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" + +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + +[[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset", + "pin-utils", + "static_assertions", +] + +[[package]] +name = "parallel-copy" +version = "0.1.0" +dependencies = [ + "nix", +] + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..672e57a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "parallel-copy" +version = "0.1.0" +edition = "2021" + +[dependencies] +nix = "0.26" + + +[profile.release] +lto = true +debug = false +strip = true +panic = "unwind" +codegen-units = 1 diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ce1e9e2 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,103 @@ +use nix::unistd::ftruncate; +use std::fs::OpenOptions; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::thread; +use std::time::Instant; +use std::{cmp, fs}; +use std::{env, fs::File, os::fd::IntoRawFd}; + +const NUM_THREADS: i64 = 8; + +fn truncate_dst(f: File, len: i64) -> nix::Result<()> { + ftruncate(f.into_raw_fd(), len) +} + +fn copy_range(src: &mut File, dst: &mut File, offset: u64, len: u64, chunk_bytes: u64) { + src.seek(SeekFrom::Start(offset)).unwrap(); + dst.seek(SeekFrom::Start(offset)).unwrap(); + + let end = cmp::min(offset + chunk_bytes, len); + + // replace chunk_bytes with the actual nr of chunk_bytes + // which can be less if we are at the last chunk of src + let chunk_bytes = (end - offset) as usize; + + let bufsz = cmp::min(1024 * 1024, chunk_bytes); + let mut buf = vec![0u8; bufsz]; + + let nr_of_full_reads = chunk_bytes / bufsz; + let bytes_for_last_read = chunk_bytes % bufsz; + + // full reads + for _ in 0..nr_of_full_reads { + src.read_exact(&mut buf).unwrap(); + dst.write_all(&buf).unwrap(); + } + + // last (partial) read + if bytes_for_last_read > 0 { + buf.resize(bytes_for_last_read, 0); + src.read_exact(&mut buf).unwrap(); + dst.write_all(&buf).unwrap(); + } +} + +fn thread_copy_chunks(thread_nr: i64, src: &str, dst: &str, len: i64) { + let mut src = File::open(src).unwrap(); + let mut dst = OpenOptions::new().write(true).open(dst).unwrap(); + + let chunk_bytes = len / (NUM_THREADS * 2); + + let jump = NUM_THREADS * chunk_bytes; + + let mut offset = thread_nr * chunk_bytes; + let mut loop_count = 0; + loop { + println!("t{thread_nr} l{loop_count} {offset} chunk={chunk_bytes}"); + + copy_range( + &mut src, + &mut dst, + offset as u64, + len as u64, + chunk_bytes as u64, + ); + + offset += jump; + if offset >= len { + break; + } + + loop_count += 1; + } +} + +fn main() { + let src = env::args().nth(1).unwrap(); + let dst = env::args().nth(2).unwrap(); + + let len = fs::metadata(&src).unwrap().len(); + if len > i64::MAX as u64 { + panic!("src len > i64::MAX"); + } + + { + let dst = File::create(&dst).unwrap(); + truncate_dst(dst, len as i64).unwrap(); + } + + println!("Spawning {NUM_THREADS} threads to copy {src} to {dst} ({len} bytes)"); + + let now = Instant::now(); + // create threads in scope so borrows from the main thread + // outlive all threads + thread::scope(|s| { + for thread_nr in 0..NUM_THREADS { + let src_ref = &src; + let dst_ref = &dst; + s.spawn(move || thread_copy_chunks(thread_nr, src_ref, dst_ref, len as i64)); + } + }); + + println!("took {:.3}s", now.elapsed().as_secs_f64()); +}