Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The intermediate type of collect_list/collect_set isn't compatible with Spark #12023

Open
NEUpanning opened this issue Jan 6, 2025 · 4 comments
Labels
bug Something isn't working triage Newly created issue that needs attention.

Comments

@NEUpanning
Copy link
Contributor

Bug description

The intermediate type of Spark's collect_list/collect_set is BINARY. The intermediate data type for Velox's collect_list/collect_set is ARRAY, which is incompatible with BINARY. The current workaround implemented in Gluten incurs some issues, which include gluten#8227 and gluten#8184.

A complete solution involves changing the intermediate type of Velox's collec_list/collect_set to VARBINARY and using UnsafeArrayData format to do the SerDe.

System information

/

Relevant logs

No response

@NEUpanning NEUpanning added bug Something isn't working triage Newly created issue that needs attention. labels Jan 6, 2025
@NEUpanning
Copy link
Contributor Author

cc @zhztheplayer

@zhztheplayer
Copy link
Contributor

Thank you for working on this @NEUpanning.

This will involves changes on Spark functions collect_list / collect_set to make them better aligned with vanilla Spark. Could see the links posted in PR description for more information. If anyone has any thoughts about the topic please kindly discuss here.

cc @rui-mo

@NEUpanning
Copy link
Contributor Author

In the intermediate step of the collect_list function, we need the raw input type to deserialize the intermediate data. However, we cannot obtain this from argTypes or resultType, so we need to include serialized raw input type as part of the intermediate data. After discussing this with @zhztheplayer offline, we concluded that we could use Type::serialize to serialize the type information.

@NEUpanning
Copy link
Contributor Author

NEUpanning commented Jan 20, 2025

@zhztheplayer @rui-mo
I opened a draft PR in #12121. After this change, Velox will not automatically generate companion
functions for collect_list if the return type is still array(T). Perhaps, we could create a signature for each data type like this:

std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
  for (const auto& inputType :
      {"tinyint", "smallint", "integer", "bigint", "real", "double" ./* ... */}) {
    auto returnType = fmt::format("array({})", inputType);
    signatures.push_back(exec::AggregateFunctionSignatureBuilder()
                             .returnType(returnType)
                             .intermediateType("varbinary")
                             .argumentType(inputType)
                             .build());
  }

However, we may not be able to enumerate all types. What do you think? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Newly created issue that needs attention.
Projects
None yet
Development

No branches or pull requests

2 participants