Skip to content

Commit

Permalink
[Enhancement] Enhance observability in profile for paimon queries (ba…
Browse files Browse the repository at this point in the history
…ckport #55769) (#56475)
  • Loading branch information
mxdzs0612 authored Mar 4, 2025
1 parent fc14f94 commit da8647a
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
Expand All @@ -39,13 +41,20 @@
import com.starrocks.sql.optimizer.statistics.Statistics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
Expand All @@ -62,8 +71,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;

public class PaimonMetadata implements ConnectorMetadata {
Expand Down Expand Up @@ -248,46 +259,105 @@ public List<RemoteFileInfo> getRemoteFileInfos(Table table, List<PartitionKey> p
ReadBuilder readBuilder = paimonTable.getNativeTable().newReadBuilder();
int[] projected = fieldNames.stream().mapToInt(name -> (paimonTable.getFieldNames().indexOf(name))).toArray();
List<Predicate> predicates = extractPredicates(paimonTable, predicate);
List<Split> splits = readBuilder.withFilter(predicates).withProjection(projected).newScan().plan().splits();
InnerTableScan scan = (InnerTableScan) readBuilder.withFilter(predicates).withProjection(projected).newScan();
PaimonMetricRegistry paimonMetricRegistry = new PaimonMetricRegistry();
List<Split> splits = scan.withMetricsRegistry(paimonMetricRegistry).plan().splits();
traceScanMetrics(paimonMetricRegistry, splits, ((PaimonTable) table).getTableName(), predicates);

PaimonSplitsInfo paimonSplitsInfo = new PaimonSplitsInfo(predicates, splits);
paimonSplits.put(filter, paimonSplitsInfo);
List<RemoteFileDesc> remoteFileDescs = ImmutableList.of(
PaimonRemoteFileDesc.createPamonRemoteFileDesc(paimonSplitsInfo));
PaimonRemoteFileDesc.createPaimonRemoteFileDesc(paimonSplitsInfo));
remoteFileInfo.setFiles(remoteFileDescs);
} else {
List<RemoteFileDesc> remoteFileDescs = ImmutableList.of(
PaimonRemoteFileDesc.createPamonRemoteFileDesc(paimonSplits.get(filter)));
PaimonRemoteFileDesc.createPaimonRemoteFileDesc(paimonSplits.get(filter)));
remoteFileInfo.setFiles(remoteFileDescs);
}

return Lists.newArrayList(remoteFileInfo);
}

private void traceScanMetrics(PaimonMetricRegistry metricRegistry,
List<Split> splits,
String tableName,
List<Predicate> predicates) {
// Don't need scan metrics when selecting system table, in which metric group is null.
if (metricRegistry.getMetricGroup() == null) {
return;
}
String prefix = "Paimon.plan.";

if (paimonNativeCatalog instanceof CachingCatalog) {
CachingCatalog.CacheSizes cacheSizes = ((CachingCatalog) paimonNativeCatalog).estimatedCacheSizes();
Tracers.record(EXTERNAL, prefix + "total.cachedDatabaseNumInCatalog",
String.valueOf(cacheSizes.databaseCacheSize()));
Tracers.record(EXTERNAL, prefix + "total.cachedTableNumInCatalog",
String.valueOf(cacheSizes.tableCacheSize()));
Tracers.record(EXTERNAL, prefix + "total.cachedManifestNumInCatalog",
String.valueOf(cacheSizes.manifestCacheSize()));
Tracers.record(EXTERNAL, prefix + "total.cachedManifestBytesInCatalog",
cacheSizes.manifestCacheBytes() + " B");
Tracers.record(EXTERNAL, prefix + "total.cachedPartitionNumInCatalog",
String.valueOf(cacheSizes.partitionCacheSize()));
}

for (int i = 0; i < predicates.size(); i++) {
Tracers.record(EXTERNAL, prefix + tableName + ".filter." + i, predicates.get(i).toString());
}

Map<String, Metric> metrics = metricRegistry.getMetrics();
long manifestFileReadTime = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_DURATION)).getValue();
long scannedManifestFileNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS)).getValue();
long skippedDataFilesNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)).getValue();
long resultedDataFilesNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES)).getValue();
long manifestNumReadFromCache = (long) ((Gauge<?>) metrics.get(ScanMetrics.MANIFEST_HIT_CACHE)).getValue();
long manifestNumReadFromRemote = (long) ((Gauge<?>) metrics.get(ScanMetrics.MANIFEST_MISSED_CACHE)).getValue();

Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestFileReadTime", manifestFileReadTime + "ms");
Tracers.record(EXTERNAL, prefix + tableName + "." + "scannedManifestFileNum", String.valueOf(scannedManifestFileNum));
Tracers.record(EXTERNAL, prefix + tableName + "." + "skippedDataFilesNum", String.valueOf(skippedDataFilesNum));
Tracers.record(EXTERNAL, prefix + tableName + "." + "resultedDataFilesNum", String.valueOf(resultedDataFilesNum));
Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestNumReadFromCache",
String.valueOf(manifestNumReadFromCache));
Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestNumReadFromRemote",
String.valueOf(manifestNumReadFromRemote));
Tracers.record(EXTERNAL, prefix + "total.resultSplitsNum", String.valueOf(splits.size()));

AtomicLong resultedTableFilesSize = new AtomicLong(0);
for (Split split : splits) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
dataFileMetas.forEach(dataFileMeta -> resultedTableFilesSize.addAndGet(dataFileMeta.fileSize()));
}
Tracers.record(EXTERNAL, prefix + tableName + "." + "resultedDataFilesSize", resultedTableFilesSize.get() + " B");
}

@Override
public Statistics getTableStatistics(OptimizerContext session,
Table table,
Map<ColumnRefOperator, Column> columns,
List<PartitionKey> partitionKeys,
ScalarOperator predicate,
long limit) {
Statistics.Builder builder = Statistics.builder();
for (ColumnRefOperator columnRefOperator : columns.keySet()) {
builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown());
}
try (Timer ignored = Tracers.watchScope(EXTERNAL, "GetPaimonTableStatistics")) {
Statistics.Builder builder = Statistics.builder();
for (ColumnRefOperator columnRefOperator : columns.keySet()) {
builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown());
}

List<String> fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList());
List<RemoteFileInfo> fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos(
catalogName, table, null, -1, predicate, fieldNames, limit);
PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
List<Split> splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits();
long rowCount = getRowCount(splits);
if (rowCount == 0) {
builder.setOutputRowCount(1);
} else {
builder.setOutputRowCount(rowCount);
List<String> fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList());
List<RemoteFileInfo> fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos(
catalogName, table, null, -1, predicate, fieldNames, limit);
PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
List<Split> splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits();
long rowCount = getRowCount(splits);
if (rowCount == 0) {
builder.setOutputRowCount(1);
} else {
builder.setOutputRowCount(rowCount);
}
return builder.build();
}

return builder.build();
}

public static long getRowCount(List<? extends Split> splits) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.paimon;

import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricGroupImpl;
import org.apache.paimon.metrics.MetricRegistry;

import java.util.Map;

public class PaimonMetricRegistry extends MetricRegistry {
private MetricGroup metricGroup;

@Override
protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) {
MetricGroup metricGroup = new MetricGroupImpl(groupName, variables);
this.metricGroup = metricGroup;
return metricGroup;
}

public MetricGroup getMetricGroup() {
return metricGroup;
}

public Map<String, Metric> getMetrics() {
return metricGroup.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private PaimonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) {
this.paimonSplitsInfo = paimonSplitsInfo;
}

public static PaimonRemoteFileDesc createPamonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) {
public static PaimonRemoteFileDesc createPaimonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) {
return new PaimonRemoteFileDesc(paimonSplitsInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.Type;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.RemoteFileDesc;
import com.starrocks.connector.RemoteFileInfo;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.thrift.TExplainLevel.VERBOSE;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -126,8 +129,12 @@ public long getEstimatedLength(long rowCount, TupleDescriptor tupleDescriptor) {
public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOperator predicate) {
List<String> fieldNames =
tupleDescriptor.getSlots().stream().map(s -> s.getColumn().getName()).collect(Collectors.toList());
List<RemoteFileInfo> fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos(
paimonTable.getCatalogName(), paimonTable, null, -1, predicate, fieldNames, -1);
List<RemoteFileInfo> fileInfos;
try (Timer ignored = Tracers.watchScope(EXTERNAL, paimonTable.getTableName() + ".getPaimonRemoteFileInfos")) {
fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos(
paimonTable.getCatalogName(), paimonTable, null, -1, predicate, fieldNames, -1);
}

PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
PaimonSplitsInfo splitsInfo = remoteFileDesc.getPaimonSplitsInfo();
String predicateInfo = encodeObjectToString(splitsInfo.getPredicate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
Expand Down Expand Up @@ -274,7 +275,8 @@ public InternalRow next() {

@Test
public void testGetRemoteFileInfos(@Mocked FileStoreTable paimonNativeTable,
@Mocked ReadBuilder readBuilder)
@Mocked ReadBuilder readBuilder,
@Mocked InnerTableScan scan)
throws Catalog.TableNotExistException {
new MockUp<PaimonMetadata>() {
@Mock
Expand All @@ -290,6 +292,8 @@ public long getTableCreateTime(String dbName, String tblName) {
result = readBuilder;
readBuilder.withFilter((List<Predicate>) any).withProjection((int[]) any).newScan().plan().splits();
result = splits;
readBuilder.newScan();
result = scan;
}
};
PaimonTable paimonTable = (PaimonTable) metadata.getTable("db1", "tbl1");
Expand Down Expand Up @@ -426,7 +430,7 @@ public List<RemoteFileInfo> getRemoteFileInfos(String catalogName, Table table,
long snapshotId, ScalarOperator predicate, List<String> fieldNames,
long limit) {
return Lists.newArrayList(RemoteFileInfo.builder()
.setFiles(Lists.newArrayList(PaimonRemoteFileDesc.createPamonRemoteFileDesc(
.setFiles(Lists.newArrayList(PaimonRemoteFileDesc.createPaimonRemoteFileDesc(
new PaimonSplitsInfo(null, Lists.newArrayList((Split) splits.get(0))))))
.build());
}
Expand Down

0 comments on commit da8647a

Please sign in to comment.