Support float64 in the streaming process#1001
Conversation
32cbcbc to
2ddd503
Compare
There was a problem hiding this comment.
Pull request overview
This PR extends TopN handling to support float64 values end-to-end (streaming, query planning, serialization), and adjusts distributed TopN aggregation behavior, backed by additional test cases and test data.
Changes:
- Introduce a
SortableValueabstraction (int/float) for TopN values and aggregation, including float-aware marshal/unmarshal. - Update streaming TopN to support float sorting via a field-type-driven comparator (
WithFieldType) and update query TopN queueing to handle float values. - Add/extend TopN test cases and test data for int64 and float64 aggregations (SUM/MAX/MIN/MEAN/COUNT).
Reviewed changes
Copilot reviewed 53 out of 53 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/cases/topn/topn.go | Adds new TopN test entries for int64 and float64 aggregation scenarios. |
| test/cases/topn/data/want/topn_sum.yaml | Expected output for int64 SUM aggregation TopN. |
| test/cases/topn/data/want/topn_min.yaml | Expected output for int64 MIN aggregation TopN. |
| test/cases/topn/data/want/topn_mean.yaml | Expected output for int64 MEAN aggregation TopN. |
| test/cases/topn/data/want/topn_max.yaml | Expected output for int64 MAX aggregation TopN. |
| test/cases/topn/data/want/topn_count.yaml | Expected output for int64 COUNT aggregation TopN. |
| test/cases/topn/data/want/float_sum_aggr_desc.yaml | Expected output for float64 SUM TopN (DESC). |
| test/cases/topn/data/want/float_max_aggr_desc.yaml | Expected output for float64 MAX TopN (DESC). |
| test/cases/topn/data/want/float_min_aggr_asc.yaml | Expected output for float64 MIN TopN (ASC). |
| test/cases/topn/data/want/float_mean_aggr_desc.yaml | Expected output for float64 MEAN TopN (DESC). |
| test/cases/topn/data/want/float_count_aggr_desc.yaml | Expected output for float64 COUNT TopN (DESC). |
| test/cases/topn/data/input/topn_sum.yaml | Adds TopN test input config for SUM aggregation. |
| test/cases/topn/data/input/topn_sum.ql | Adds QL for SUM aggregation TopN. |
| test/cases/topn/data/input/topn_min.yaml | Adds TopN test input config for MIN aggregation. |
| test/cases/topn/data/input/topn_min.ql | Adds QL for MIN aggregation TopN. |
| test/cases/topn/data/input/topn_mean.yaml | Adds TopN test input config for MEAN aggregation. |
| test/cases/topn/data/input/topn_mean.ql | Adds QL for MEAN aggregation TopN. |
| test/cases/topn/data/input/topn_max.yaml | Adds TopN test input config for MAX aggregation. |
| test/cases/topn/data/input/topn_max.ql | Adds QL for MAX aggregation TopN. |
| test/cases/topn/data/input/topn_count.yaml | Adds TopN test input config for COUNT aggregation. |
| test/cases/topn/data/input/topn_count.ql | Adds QL for COUNT aggregation TopN. |
| test/cases/topn/data/input/float_sum_aggr_desc.yaml | Adds float64 TopN test input config for SUM (DESC). |
| test/cases/topn/data/input/float_sum_aggr_desc.ql | Adds float64 QL for SUM (DESC). |
| test/cases/topn/data/input/float_min_aggr_asc.yaml | Adds float64 TopN test input config for MIN (ASC). |
| test/cases/topn/data/input/float_min_aggr_asc.ql | Adds float64 QL for MIN (ASC). |
| test/cases/topn/data/input/float_mean_aggr_desc.yaml | Adds float64 TopN test input config for MEAN (DESC). |
| test/cases/topn/data/input/float_mean_aggr_desc.ql | Adds float64 QL for MEAN (DESC). |
| test/cases/topn/data/input/float_max_aggr_desc.yaml | Adds float64 TopN test input config for MAX (DESC). |
| test/cases/topn/data/input/float_max_aggr_desc.ql | Adds float64 QL for MAX (DESC). |
| test/cases/topn/data/input/float_count_aggr_desc.yaml | Adds float64 TopN test input config for COUNT (DESC). |
| test/cases/topn/data/input/float_count_aggr_desc.ql | Adds float64 QL for COUNT (DESC). |
| test/cases/measure/data/testdata/topn_metric_data.json | Adds measure write test data used by TopN aggregation tests (int). |
| test/cases/measure/data/testdata/service_instance_float_metric_data.json | Adds measure write test data used by TopN aggregation tests (float). |
| test/cases/init.go | Wires new test datasets into test initialization. |
| pkg/test/measure/testdata/topn_aggregations/topn_agg_test.json | Adds TopN aggregation schema test fixture (int). |
| pkg/test/measure/testdata/topn_aggregations/service_instance_float_metric_top_bottom_3.json | Adds TopN aggregation schema test fixture (float). |
| pkg/test/measure/testdata/measures/service_instance_metric_topn_test.json | Adds measure schema fixture for int TopN tests. |
| pkg/test/measure/testdata/measures/service_instance_float_metric.json | Adds measure schema fixture for float TopN tests. |
| pkg/query/logical/measure/topn_plan_localscan.go | Converts stored TopN binary values back into typed field values using SortableValue. |
| pkg/query/logical/measure/measure_top_test.go | Updates TopQueue tests to use the new SortableValue wrapper type. |
| pkg/query/logical/measure/measure_top.go | Generalizes TopQueue to compare SortableValue rather than only int64. |
| pkg/query/logical/measure/measure_plan_top.go | Extracts sortable values from datapoints (int/float) when building TopN streams. |
| pkg/query/logical/measure/measure_plan_aggregation.go | Ensures COUNT uses int64 aggregation plan regardless of field type. |
| pkg/flow/streaming/topn_test.go | Updates streaming TopN tests for new sort-key extractor signature. |
| pkg/flow/streaming/topn.go | Adds float-aware comparator selection via WithFieldType and generalizes sort key handling. |
| pkg/flow/streaming/streaming_test.go | Updates streaming tests to supply WithFieldType and use interface sort-key extractors. |
| pkg/convert/number.go | Adds order-preserving float encoding (Float64ToOrderedBytes) for sorting/merging. |
| banyand/measure/topn_test.go | Expands TopNValue tests to cover float64 marshal/unmarshal and value handling. |
| banyand/measure/topn_post_processor_test.go | Updates post-processor tests for typed SortableValue storage. |
| banyand/measure/topn_post_processor.go | Refactors TopN post-processing/aggregation to work with int/float values. |
| banyand/measure/topn.go | Adds float field extraction and float64 TopNValue encoding/decoding support. |
| banyand/measure/merger_test.go | Updates merger test fixtures to use typed SortableValue values. |
| banyand/dquery/topn.go | Adjusts distributed TopN aggregation flow and float sorting key encoding. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1001 +/- ##
==========================================
+ Coverage 45.97% 47.64% +1.66%
==========================================
Files 328 403 +75
Lines 55505 65333 +9828
==========================================
+ Hits 25520 31127 +5607
- Misses 27909 31252 +3343
- Partials 2076 2954 +878
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
- Utilize Go's generic types to implement topNStreamingProcessor, TopNValue, and related components. It is preferable to minimize the "switch" logic in this context.
- Convert []float to []int using encoding.Float64ListToDecimalIntList. Subsequently, marshal the int64 list following the existing logic.
There was a problem hiding this comment.
Thank you for the valuable feedback. I have refactored the code to use Go's generic types for TopNValue and related components. And for float64 encoding, used encoding.Float64ListToDecimalIntList to convert float64 values to int64 before marshaling.
Please let me know if you have any further suggestions or concerns.
There was a problem hiding this comment.
Introduce a generic type:
// TopSortKey is the constraint for TopN sort keys (int64 or float64).
type TopSortKey interface {
int64 | float64
}
topNAggregatorGroup, TopNOption and etc should this type.
| topNValue := measure.GenerateTopNValueInt() | ||
| defer measure.ReleaseTopNValueInt(topNValue) |
There was a problem hiding this comment.
Move them outside the loop. Reuse the topNValue in the loop. defer's scope is the function not the for loop body.
| return &comparableTopNItem{s.Items[s.index-1]} | ||
| } | ||
|
|
||
| func processTopNResponse[N aggregation.Number]( |
There was a problem hiding this comment.
In the old code, partial results from successful nodes were still used even when some nodes errored. Now, if 1 out of 10 nodes fails, the entire query returns an error. This is a regression for distributed scenarios.
| version: version, | ||
| } | ||
|
|
||
| if taggr.topN <= 0 { |
| originalTopN := request.GetTopN() | ||
| // Set topN to 0 to disable truncation on data nodes for all aggregation functions. | ||
| // This ensures coordinator-side aggregation can see all relevant candidates. | ||
| request.TopN = 0 |
There was a problem hiding this comment.
For high-cardinality entities, this could OOM data nodes during distributed TopN queries.
| *bus.UnImplementedHealthyListener | ||
| } | ||
|
|
||
| func (t *topNQueryProcessor) getTopNFieldType(ctx context.Context, group, topNName string) databasev1.FieldType { |
There was a problem hiding this comment.
It falls back silently to FIELD_TYPE_UNSPECIFIED → treated as int64. It's not correct.
| timestamp uint64, leftVersion, rightVersion int64, | ||
| ) ([]byte, error) { | ||
| fieldType, detectErr := DetectFieldTypeFromBinary(left) | ||
| if detectErr != nil { |
There was a problem hiding this comment.
If left fails detection but right succeeds and returns FLOAT, then left (which might be int64 data from an older format) will be deserialized as float64 — causing data corruption.
If this is non-trivial feature, paste the links/URLs to the design doc.
Update the documentation to include this new feature.
Tests(including UT, IT, E2E) are added to verify the new feature.
If it's UI related, attach the screenshots below.
If this pull request closes/resolves/fixes an existing issue, replace the issue number. Fixes [BanyanDB] Support float64 in the streaming process skywalking#10277.
Update the
CHANGESlog.