Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent FlightData overflowing max size limit whenever possible. #6690

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

itsjunetime
Copy link
Contributor

Which issue does this PR close?

Closes #3478

What changes are included in this PR?

This reworks the encoding/writing step of flight data messages to ensure it never overflows the given limit whenever possible (specifically, it's impossible when we can't even fit a single row + header within the limit - there are still no mechanisms for splitting a single row of data between multiple messages).

It does this by first constructing a fake IPC header, then getting that header's encoded length, and then subtracting that length from the provided max size. Because the header's size stays the same with the same schema (the only thing that changes is the value within the 'length of data' fields), we don't need to continually recalculate it.

There are more tests I'd like to add before merging this, I was just hoping to get this filed first so that I could get feedback in case any behavior seemed seriously off.

Rationale for these changes

Since we are dynamically checking array data sizes to see if they can fit within the alloted size, this ensures that they will never go over if possible. Of course, as I said before, they will still go over if necessary, but I've rewritten the tests to check this behavior (if the tests sense an overage, but decode it to see that only one row was written, they allow it as there is no other way to get the data across).

Are there any user-facing changes?

Yes, there are API additions. They are documented. As far as I can tell, this shouldn't require a breaking release, but I haven't run anything like cargo-semver-checks on it to actually verify.

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Nov 5, 2024
};

match array_data.data_type() {
DataType::Dictionary(_, _) => Ok(()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that I wasn't able to figure out about this encoding process - it seems we don't write the child_data for dictionaries into the encoded message, but that's where all the values of the dictionary are. Without this, we only have the keys written. Does anyone know why this is?

@itsjunetime
Copy link
Contributor Author

It looks like CI failed due to some network flakiness - I'm going to close and reopen to try it again

@itsjunetime itsjunetime closed this Nov 8, 2024
@itsjunetime itsjunetime reopened this Nov 8, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @itsjunetime

I went over this PR carefully.

I think this is a very nice improvement -- I think the reason it is so large is that it makes the slice size calculation more accurate, which seems like a good think to me

One thing that would (selfishly) help me review these PRs is if you could annotate anything "non obvuous" with the rationale of why.

Examples of non obvious changes are:

  1. The change to bit_slice
  2. Switching the code back to use the (deprecated) method flight_data_from_arrow_batch

BTW what do you think we should do about flight_data_from_arrow_batch given that you changed a bunch of code in this PR to use it it seems like the suggested alternatives are not a good fit?

/// otherwise a new buffer is allocated and filled with a copy of the bits in the range.
pub fn bit_slice(&self, offset: usize, len: usize) -> Self {
if offset % 8 == 0 {
return self.slice(offset / 8);
if offset % 8 == 0 && len % 8 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it may fix a bug where the length is incorrectly set after slice if the offset is zero 👍

Can you please also add a unit tests showing this bug fix (aka a unit test in immutable.rs)

I verified that the tests in this PR fail after this change:

Encoding 1023 with a maximum size of 1024
test encode::tests::flight_data_size_string_dictionary ... FAILED

failures:

failures:
    encode::tests::flight_data_size_string_dictionary

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 28 filtered out; finished in 0.00s


--- STDERR:              arrow-flight encode::tests::flight_data_size_string_dictionary ---
thread 'encode::tests::flight_data_size_string_dictionary' panicked at arrow-flight/src/encode.rs:1717:21:
assertion `left == right` failed: encoded data[1]: actual size 1136, actual_overage: 112
  left: 112
 right: 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I'll pull this out to a separate PR to make this easier to review and add a unit test there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this to #6707

@@ -327,6 +327,10 @@ impl FlightDataEncoder {

/// Encodes batch into one or more `FlightData` messages in self.queue
fn encode_batch(&mut self, batch: RecordBatch) -> Result<()> {
if batch.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this optimization?

I vaguely remember at least one usecase of encoding empty batches is send the schema information. With this change it seems empty batches can no longer be encoded

Unless there is a reason to prevent empty batches, I would suggest we remove this check here and instead callers of the encoder can decide to filter out empty batches if they want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an optimization, but rather due to a difference in behavior that the tests dictate - in some of the tests, encoding an empty batch should result in an IPC header + empty data, but in some other tests, encoding an empty batch should result in nothing. I haven't yet figured out exactly why that is, and need to do some more poking to figure out why different parts of the code want this different behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it looks like there are two contradicting comments regarding this behavior:

  1. The doc comment for this function says one or more FlightData messages - implying that if we send an empty batch, it should still load one FlightData message into the queue. This is at odds with the previous behavior, which just used the response from split_batch_for_grpc_response as an iterator for batches to load in, but split_batch... returned an empty vector when given a RecordBatch with 0 rows.
  2. A comment in the arrow-flight tests (here) that are relevant to this change says that empty batches should not be encoded. If we comment out the filter that removes empty batches AND comment out this check for empty batches, all tests pass.

Because there are multiple conflicting records of what is the expected behavior, I'm not sure what to do. I feel like we should respect the doc-comment, as that's what's documented to the users, but the behavior seems to be contrary to that, and it's also contrary to some other tests in arrow-ipc that always expect encoding of a Batch to return some encoded data (even if it's just a header).

@@ -700,6 +682,9 @@ mod tests {
use super::*;

#[test]
// flight_data_from_arrow_batch is deprecated but does exactly what we need. Would probably be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not continue to use make_flight_data (which seems like is exactly what this comment proposes -- a copy of flight_data_from_arrow_batch?

This change seems like it would only make the final removal of flight_data_from_arrow_batch harder (would have to re-create make_flight_data)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just didn't see the purpose for duplicating code when this method already existed and did the exact same thing as make_flight_data. It also seems that flight_data_from_arrow_batch is a perfectly fine method, just easy to misuse, and that's why it was deprecated. Its behavior was duplicated in many different places throughout this crate, though - I don't think it's something that we would ever want to completely remove from usage, as it clearly has an internal purpose.

When we want to remove flight_data_from_arrow_batch from the public API, we could just comfortably make it private, and keep using its functionality internally.

Does that make sense? I feel like I may not be explaining my thoughts perfectly.

let b = pretty_format_batches(&[batch]).unwrap().to_string();
assert_eq!(a, b);
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why this test removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test simply checked something that isn't applicable anymore - it assumes that we can split a single RecordBatch into smaller RecordBatches with less rows which saturate the given size, but we don't want to verify that we can do that anymore as we don't want to provide any way to split RecordBatches without also accomodating for the size of the IPC Headers (and the rest of the functions in this mod are already doing that).

This test was just mainly there for the split_batch_for_grpc_response function, and that's just not part of the design anymore.

@@ -1679,9 +1637,7 @@ mod tests {

let batch = RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as _)]).unwrap();

// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


// If only 1 row was sent over via this recordBatch, there was no way to avoid
// going over the limit. There's currently no mechanism for splitting a single row
// of results over multiple messages, so we allow going over the limit if it's the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense -- could you also put this caveat in the documentation for max_encoding_message_size ? (we can do it as a follow on PR too)

https://docs.rs/arrow-flight/latest/arrow_flight/flight_service_client/struct.FlightServiceClient.html#method.max_encoding_message_size

let dictionary_flight_data: Vec<FlightData> =
encoded_dictionaries.into_iter().map(Into::into).collect();
let mut batch_flight_data: FlightData = encoded_batch.into();
#[allow(deprecated)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unfortunate that we are reverting back to the deprecated method. Maybe we can figure out some non deprecated API 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @itsjunetime I got though a bunch of this PR today but even after reviewing it for an hour I was not able to make it through the entire thing

The changes to flight make sense, but I am struggling to review the changes in arrow-ipc

Is there any way you can break it up into smaller pieces to help review? If not I will try and find more contiguous time to review this but it may be a while (like in a few weeks) before I find it

arrow-ipc/src/writer.rs Outdated Show resolved Hide resolved
arrow-ipc/src/writer.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a brief skim, couple of comments

Adding an IPC specific API to ArrayData seems a touch unfortunate. It's probably ok, but a little off.

I'm not really sure of the context for this PR, but assuming it is to better avoid the gRPC limits, I worry this may be a fools errand. There is a lot beyond the data buffers going into those payloads (e.g. metadata flatbuffers, framing protobuf, HTTP framing, etc...) and trying to account for all of this is going to be a never ending game of wack a mole. Ultimately the only solution I can see is to set the soft limit in the encoder well below the hard limit enforced by the transport.

match spec {
// Just pulled from arrow-ipc
#[inline]
fn pad_to_alignment(alignment: u8, len: usize) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is assuming alignment is a power of two

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that - I think the best move would just be documenting this in the function doc-comment so people can be aware of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ae37b13

@alamb
Copy link
Contributor

alamb commented Nov 15, 2024

I had a brief skim, couple of comments

Adding an IPC specific API to ArrayData seems a touch unfortunate. It's probably ok, but a little off.

I'm not really sure of the context for this PR, but assuming it is to better avoid the gRPC limits, I worry this may be a fools errand. There is a lot beyond the data buffers going into those payloads (e.g. metadata flatbuffers, framing protobuf, HTTP framing, etc...) and trying to account for all of this is going to be a never ending game of wack a mole. Ultimately the only solution I can see is to set the soft limit in the encoder well below the hard limit enforced by the transport.

The context of this PR is that we have set the soft limit well below the hard limit (2MB vs 4MB) and somehow a customer still managed to hit the hard limit. So @itsjunetime is trying to improve the specificity of the size.

We (at least our team in Influx) don't directly control all layers involved in gRPC communication -- this limit is being enforced by one of the various programs / mixings installed in kubernetes to manage traffic, report on things, etc. While in theory we should be able to figure out which one and increase its limits, that will also likely be a never ending game of whack a mole.

Let me see if I can help to find a way to break this PR up into smaller, more manageable pieces, so that we can get this in / tested in a way that is reasonable to maintain

@itsjunetime
Copy link
Contributor Author

Let me see if I can help to find a way to break this PR up into smaller, more manageable pieces, so that we can get this in / tested in a way that is reasonable to maintain

One change I could make that may help with breaking this up and also with @tustvold's concern about the IPC-specific interface would be to maybe genericize the get_memory_slice_size_with_alignment over a T: MemoryAccountant (or something like that), e.g.:

enum SizeSource {
  Buffer(BufferType),
  NullBuffer,
  ChildData,
  // does it need to be more granular? idk
}

trait MemoryAccountant {
  fn count_size(&mut self, size: usize, source: SizeSource);
}

impl ArrayData {
  fn get_slice_memory_size_with_accountant<A: MemoryAccountant>(
    &self,
    acc: &mut A
  ) -> Result<(), ArrowError> {
    // ...
  }

  fn get_slice_memory_size(&self) -> Result<usize, ArrowError> {
    struct DefaultAccountant(usize);

    impl MemoryAccountant for DefaultAccountant {
      fn count_size(&mut self, size: usize, _: SizeSource) {
        self.0 += size;
      }
    }
  
    let mut acc = DefaultAccountant(0);
    self.get_slice_memory_size_with_accountant(&mut acc)?;
    Ok(acc.0)
  }
}

This would allows us to use it nicely with the 'alignment' accounting that we need without being too IPC-specific. It would also allow us to remove the ugly re-accounting for RunEndEncoded buffers that this PR adds in get_encoded_arr_batch_size, which would be nice.

Obviously, I'd be happy to make this change and pull it out to a separate PR (to make this PR easier to review once that separate PR is merged) if we feel like this would be a better move.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
3 participants