Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Array::shrink_to_fit(&mut self) #6790

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

emilk
Copy link

@emilk emilk commented Nov 25, 2024

Which issue does this PR close?

Rationale for this change

Concatenating many arrow buffers incrementally can lead to situations where the buffers are using much more memory than they need (their capacity is larger than their lengths).

Example:

use arrow::{
    array::{Array, ArrayRef, ListArray, PrimitiveArray},
    buffer::OffsetBuffer,
    datatypes::{Field, UInt8Type},
};

fn main() {
    let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
        .map(|v| (v % 255) as u8)
        .collect::<Vec<_>>()
        .into();
    let array0: ArrayRef = Arc::new(array0);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array0.clone());
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");

    eprintln!("---");

    let array1 = ListArray::new(
        Field::new_list_field(array0.data_type().clone(), false).into(),
        OffsetBuffer::from_lengths(std::iter::once(array0.len())),
        array0.clone(),
        None,
    );
    let array1: ArrayRef = Arc::new(array1);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array1.clone()).shrink_to_fit();
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");
}

fn concatenate(array: ArrayRef) -> ArrayRef {
    let mut concatenated = array.clone();

    for _ in 0..1000 {
        concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
    }

    concatenated
}

fn how_many_bytes(array: ArrayRef) -> u64 {
    let mut array = array;
    loop {
        match array.data_type() {
            arrow::datatypes::DataType::UInt8 => break,
            arrow::datatypes::DataType::List(_) => {
                let list = array.as_any().downcast_ref::<ListArray>().unwrap();
                array = list.values().clone();
            }
            _ => unreachable!(),
        }
    }

    array.len() as _
}

// --- Memory tracking ---

use std::sync::{
    atomic::{AtomicUsize, Ordering::Relaxed},
    Arc,
};

static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);

thread_local! {
    static LIVE_BYTES_IN_THREAD: AtomicUsize = const { AtomicUsize::new(0)  } ;
}

pub struct TrackingAllocator {
    allocator: std::alloc::System,
}

#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
    allocator: std::alloc::System,
};

#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
    #[allow(clippy::let_and_return)]
    unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.alloc(layout) }
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.dealloc(ptr, layout) };
    }
}

fn live_bytes_local() -> usize {
    LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}

fn live_bytes_global() -> usize {
    LIVE_BYTES_GLOBAL.load(Relaxed)
}

/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
    let used_bytes_start_local = live_bytes_local();
    let used_bytes_start_global = live_bytes_global();
    let ret = run();
    let bytes_used_local = live_bytes_local() - used_bytes_start_local;
    let bytes_used_global = live_bytes_global() - used_bytes_start_global;
    drop(ret);
    (bytes_used_global, bytes_used_local)
}

If you run this you will see 12 MB is used for 6 MB of data.

Adding a call to the new .shrink_to_fit(); on concatenated removes the memory overhead.

What changes are included in this PR?

This PR adds shrink_to_fit(&mu self) to Array and all buffers.

It is best-effort.

Are there any user-facing changes?

trait Array now has a fn shrink_to_fit(&mut self) { }.

@github-actions github-actions bot added the arrow Changes to the arrow crate label Nov 25, 2024
/// └────────────────────┘ left
///
///
/// 2 views
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops… my editor is set to trim trailing whitspace on save. Let me know if you want me to reverse.

arrow-array/src/array/mod.rs Outdated Show resolved Hide resolved
@emilk emilk marked this pull request as ready for review November 25, 2024 16:07
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this call realloc or no-op, i.e. match the broad semantic of Vec::shrink_to_fit.

I think more comprehensive compaction, including materializing offsets, recomputing dictionaries, etc... belongs in a separate kernel where this behaviour can be configured.

arrow-array/src/array/mod.rs Outdated Show resolved Hide resolved
arrow-buffer/src/buffer/immutable.rs Outdated Show resolved Hide resolved
arrow-buffer/src/buffer/offset.rs Outdated Show resolved Hide resolved
arrow-array/src/array/mod.rs Outdated Show resolved Hide resolved
@emilk
Copy link
Author

emilk commented Nov 26, 2024

I've added a test to make sure the shrink_to_fit call actually returns memory to global allocator, as is its job.

This works with the current code, but if I try to use std::alloc::realloc the memory is never returned. It is as if the realloc call succeeds without actually freeing up the extra memory. Here is the diff for what I tried:

diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index 40625329a..70a100734 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -173,7 +173,15 @@ impl Buffer {
     ///
     /// If the capacity is already less than or equal to the desired capacity, this is a no-op.
     pub fn shrink_to_fit(&mut self) {
-        if self.len() < self.capacity() {
+        let desired_capacity = self.len();
+        if desired_capacity < self.capacity() {
+            if let Some(bytes) = Arc::get_mut(&mut self.data) {
+                if bytes.try_realloc(desired_capacity).is_ok() {
+                    return;
+                }
+            }
+
+            // Fallback:
             *self = Self::from_vec(self.as_slice().to_vec())
         }
     }
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index ba61342d8..e616ee7c0 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -96,6 +96,27 @@ impl Bytes {
         }
     }
 
+    /// Try to reallocate the underlying memory region to a new size (smaller or larger).
+    ///
+    /// Returns `Err(())` if the reallocation failed.
+    /// Only works for bytes allocated with the standard allocator.
+    pub fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> {
+        if let Deallocation::Standard(layout) = self.deallocation {
+            if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, layout.align()) {
+                let new_ptr =
+                    unsafe { std::alloc::realloc(self.ptr.as_mut(), new_layout, new_len) };
+                if let Some(ptr) = NonNull::new(new_ptr) {
+                    self.ptr = ptr;
+                    self.len = new_len;
+                    self.deallocation = Deallocation::Standard(new_layout);
+                    return Ok(());
+                }
+            }
+        }
+
+        Err(())
+    }
+
     #[inline]
     pub(crate) fn deallocation(&self) -> &Deallocation {
         &self.deallocation

@emilk
Copy link
Author

emilk commented Nov 26, 2024

What do you think about adding a struct ShrinkPolicy parameter to shrink_to_fit? That would allow us to add custom options in the future without breaking backwards compatibility. We could start with

#[non_exhaustive]
#[derive(Debug, Default)]
pub struct ShrinkPolicy {}

Options I could see us adding in the future:

  • What to do with shared ArrayRef
  • What to do with shared Buffer

@tustvold
Copy link
Contributor

tustvold commented Nov 26, 2024

What do you think about adding a struct ShrinkPolicy parameter to shrink_to_fit? That would allow us to add custom options in the future without breaking backwards compatibility.

It is an idea with merit, but I think let's keep shrink_to_fit simple. More complex logic to recompute minimal array representations belongs in arrow-select, we want to keep arrow-array as lightweight as possible.

the memory is never returned

How are you measuring this? Many allocators, glibc especially, hang onto memory that isn't in use to avoid thrashing the TLB. They will reuse it, they just won't give it back to the system.

@emilk
Copy link
Author

emilk commented Nov 26, 2024

How are you measuring this? Many allocators, glibc especially, hang onto memory that isn't in use to avoid thrashing the TLB. They will reuse it, they just won't give it back to the system.

I install an allocator and note down all calls to it (see arrow/tests/shrink_to_fit.rs in this PR). But I realized I had a bug in my call to realloc - I need to give it the layout of the previous allocation, but I gave it the layout to the new allocation. Fixing that bug makes the realloc path work 👍

let (concatenated, _bytes_allocated_globally, bytes_allocated_by_this_thread) =
memory_use(|| {
let mut concatenated = concatenate(num_concats, list_array.clone());
concatenated.shrink_to_fit(); // This is what we're testing!
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this call, this test fails (as it should).
In other words, this test is a regression-test for shrink_to_fit

@emilk
Copy link
Author

emilk commented Nov 26, 2024

Good idea with realloc - thanks for the swift feedback. I think this PR is ready for a final review now!

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, thank you.

I'll leave this open for a little longer in case anyone else wants to review, memory management shenanigans can have subtleties.

arrow-buffer/src/bytes.rs Show resolved Hide resolved
@emilk
Copy link
Author

emilk commented Nov 26, 2024

Btw, this is not a breaking change, so this could be added to an arrow 53.4.0 minor release (which I would greatly appreciate 🙏)

@tustvold
Copy link
Contributor

tustvold commented Nov 26, 2024

The release schedule can be found in the repository readme, the next release will be a major release in early December. Unfortunately we've already started integrating breaking changes and so a patch release is unlikely save for a major security vulnerability.

See #5368 if you're interested in some of the history behind this

assert_eq!(shrunk_empty.len(), 0);
assert_eq!(shrunk_empty.capacity(), 1); // `Buffer` and `Bytes` doesn't support 0-capacity, so we shrink to 1
assert_eq!(shrunk_empty.as_slice(), &[]);
assert_eq!(shrunk_empty.capacity(), 1); // NOTE: `Buffer` and `Bytes` doesn't support 0-capacity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should do, but IIRC you need to use a dangling ptr, there should be some examples of this...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MutableBuffer there is special handling for the size=0 case, with a dangling_ptr helper. We could copy all that logic to Bytes, but I rather not add all that complexity in this PR.

@emilk
Copy link
Author

emilk commented Nov 26, 2024

MIRI caught a couple of bugs in my unsafe code. I pushed some fixes and test improvements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add shrink_to_fit to Array
2 participants