diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java index f1fabf70835715..4c3a290d62ab16 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java @@ -22,6 +22,8 @@ import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.connector.ColumnTypeConverter; import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.HdfsEnvironment; @@ -39,13 +41,20 @@ import com.starrocks.sql.optimizer.statistics.Statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.CachingCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.operation.metrics.ScanMetrics; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.system.PartitionsTable; @@ -62,8 +71,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static com.starrocks.common.profile.Tracers.Module.EXTERNAL; import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR; public class PaimonMetadata implements ConnectorMetadata { @@ -248,21 +259,79 @@ public List getRemoteFileInfos(Table table, List p ReadBuilder readBuilder = paimonTable.getNativeTable().newReadBuilder(); int[] projected = fieldNames.stream().mapToInt(name -> (paimonTable.getFieldNames().indexOf(name))).toArray(); List predicates = extractPredicates(paimonTable, predicate); - List splits = readBuilder.withFilter(predicates).withProjection(projected).newScan().plan().splits(); + InnerTableScan scan = (InnerTableScan) readBuilder.withFilter(predicates).withProjection(projected).newScan(); + PaimonMetricRegistry paimonMetricRegistry = new PaimonMetricRegistry(); + List splits = scan.withMetricsRegistry(paimonMetricRegistry).plan().splits(); + traceScanMetrics(paimonMetricRegistry, splits, ((PaimonTable) table).getTableName(), predicates); + PaimonSplitsInfo paimonSplitsInfo = new PaimonSplitsInfo(predicates, splits); paimonSplits.put(filter, paimonSplitsInfo); List remoteFileDescs = ImmutableList.of( - PaimonRemoteFileDesc.createPamonRemoteFileDesc(paimonSplitsInfo)); + PaimonRemoteFileDesc.createPaimonRemoteFileDesc(paimonSplitsInfo)); remoteFileInfo.setFiles(remoteFileDescs); } else { List remoteFileDescs = ImmutableList.of( - PaimonRemoteFileDesc.createPamonRemoteFileDesc(paimonSplits.get(filter))); + PaimonRemoteFileDesc.createPaimonRemoteFileDesc(paimonSplits.get(filter))); remoteFileInfo.setFiles(remoteFileDescs); } return Lists.newArrayList(remoteFileInfo); } + private void traceScanMetrics(PaimonMetricRegistry metricRegistry, + List splits, + String tableName, + List predicates) { + // Don't need scan metrics when selecting system table, in which metric group is null. + if (metricRegistry.getMetricGroup() == null) { + return; + } + String prefix = "Paimon.plan."; + + if (paimonNativeCatalog instanceof CachingCatalog) { + CachingCatalog.CacheSizes cacheSizes = ((CachingCatalog) paimonNativeCatalog).estimatedCacheSizes(); + Tracers.record(EXTERNAL, prefix + "total.cachedDatabaseNumInCatalog", + String.valueOf(cacheSizes.databaseCacheSize())); + Tracers.record(EXTERNAL, prefix + "total.cachedTableNumInCatalog", + String.valueOf(cacheSizes.tableCacheSize())); + Tracers.record(EXTERNAL, prefix + "total.cachedManifestNumInCatalog", + String.valueOf(cacheSizes.manifestCacheSize())); + Tracers.record(EXTERNAL, prefix + "total.cachedManifestBytesInCatalog", + cacheSizes.manifestCacheBytes() + " B"); + Tracers.record(EXTERNAL, prefix + "total.cachedPartitionNumInCatalog", + String.valueOf(cacheSizes.partitionCacheSize())); + } + + for (int i = 0; i < predicates.size(); i++) { + Tracers.record(EXTERNAL, prefix + tableName + ".filter." + i, predicates.get(i).toString()); + } + + Map metrics = metricRegistry.getMetrics(); + long manifestFileReadTime = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_DURATION)).getValue(); + long scannedManifestFileNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS)).getValue(); + long skippedDataFilesNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)).getValue(); + long resultedDataFilesNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES)).getValue(); + long manifestNumReadFromCache = (long) ((Gauge) metrics.get(ScanMetrics.MANIFEST_HIT_CACHE)).getValue(); + long manifestNumReadFromRemote = (long) ((Gauge) metrics.get(ScanMetrics.MANIFEST_MISSED_CACHE)).getValue(); + + Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestFileReadTime", manifestFileReadTime + "ms"); + Tracers.record(EXTERNAL, prefix + tableName + "." + "scannedManifestFileNum", String.valueOf(scannedManifestFileNum)); + Tracers.record(EXTERNAL, prefix + tableName + "." + "skippedDataFilesNum", String.valueOf(skippedDataFilesNum)); + Tracers.record(EXTERNAL, prefix + tableName + "." + "resultedDataFilesNum", String.valueOf(resultedDataFilesNum)); + Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestNumReadFromCache", + String.valueOf(manifestNumReadFromCache)); + Tracers.record(EXTERNAL, prefix + tableName + "." + "manifestNumReadFromRemote", + String.valueOf(manifestNumReadFromRemote)); + Tracers.record(EXTERNAL, prefix + "total.resultSplitsNum", String.valueOf(splits.size())); + + AtomicLong resultedTableFilesSize = new AtomicLong(0); + for (Split split : splits) { + List dataFileMetas = ((DataSplit) split).dataFiles(); + dataFileMetas.forEach(dataFileMeta -> resultedTableFilesSize.addAndGet(dataFileMeta.fileSize())); + } + Tracers.record(EXTERNAL, prefix + tableName + "." + "resultedDataFilesSize", resultedTableFilesSize.get() + " B"); + } + @Override public Statistics getTableStatistics(OptimizerContext session, Table table, @@ -270,24 +339,25 @@ public Statistics getTableStatistics(OptimizerContext session, List partitionKeys, ScalarOperator predicate, long limit) { - Statistics.Builder builder = Statistics.builder(); - for (ColumnRefOperator columnRefOperator : columns.keySet()) { - builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown()); - } + try (Timer ignored = Tracers.watchScope(EXTERNAL, "GetPaimonTableStatistics")) { + Statistics.Builder builder = Statistics.builder(); + for (ColumnRefOperator columnRefOperator : columns.keySet()) { + builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown()); + } - List fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList()); - List fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos( - catalogName, table, null, -1, predicate, fieldNames, limit); - PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); - List splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits(); - long rowCount = getRowCount(splits); - if (rowCount == 0) { - builder.setOutputRowCount(1); - } else { - builder.setOutputRowCount(rowCount); + List fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList()); + List fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos( + catalogName, table, null, -1, predicate, fieldNames, limit); + PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); + List splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits(); + long rowCount = getRowCount(splits); + if (rowCount == 0) { + builder.setOutputRowCount(1); + } else { + builder.setOutputRowCount(rowCount); + } + return builder.build(); } - - return builder.build(); } public static long getRowCount(List splits) { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java new file mode 100644 index 00000000000000..3078b251ab2546 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java @@ -0,0 +1,41 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.paimon; + +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.MetricGroupImpl; +import org.apache.paimon.metrics.MetricRegistry; + +import java.util.Map; + +public class PaimonMetricRegistry extends MetricRegistry { + private MetricGroup metricGroup; + + @Override + protected MetricGroup createMetricGroup(String groupName, Map variables) { + MetricGroup metricGroup = new MetricGroupImpl(groupName, variables); + this.metricGroup = metricGroup; + return metricGroup; + } + + public MetricGroup getMetricGroup() { + return metricGroup; + } + + public Map getMetrics() { + return metricGroup.getMetrics(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonRemoteFileDesc.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonRemoteFileDesc.java index 9dce5f56ba5a4d..caaeec98a6f525 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonRemoteFileDesc.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonRemoteFileDesc.java @@ -24,7 +24,7 @@ private PaimonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) { this.paimonSplitsInfo = paimonSplitsInfo; } - public static PaimonRemoteFileDesc createPamonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) { + public static PaimonRemoteFileDesc createPaimonRemoteFileDesc(PaimonSplitsInfo paimonSplitsInfo) { return new PaimonRemoteFileDesc(paimonSplitsInfo); } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java index 24dd6cc269119f..6ab05fda008027 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java @@ -23,6 +23,8 @@ import com.starrocks.catalog.Column; import com.starrocks.catalog.PaimonTable; import com.starrocks.catalog.Type; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.connector.CatalogConnector; import com.starrocks.connector.RemoteFileDesc; import com.starrocks.connector.RemoteFileInfo; @@ -64,6 +66,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static com.starrocks.common.profile.Tracers.Module.EXTERNAL; import static com.starrocks.thrift.TExplainLevel.VERBOSE; import static java.nio.charset.StandardCharsets.UTF_8; @@ -126,8 +129,12 @@ public long getEstimatedLength(long rowCount, TupleDescriptor tupleDescriptor) { public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOperator predicate) { List fieldNames = tupleDescriptor.getSlots().stream().map(s -> s.getColumn().getName()).collect(Collectors.toList()); - List fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos( - paimonTable.getCatalogName(), paimonTable, null, -1, predicate, fieldNames, -1); + List fileInfos; + try (Timer ignored = Tracers.watchScope(EXTERNAL, paimonTable.getTableName() + ".getPaimonRemoteFileInfos")) { + fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos( + paimonTable.getCatalogName(), paimonTable, null, -1, predicate, fieldNames, -1); + } + PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); PaimonSplitsInfo splitsInfo = remoteFileDesc.getPaimonSplitsInfo(); String predicateInfo = encodeObjectToString(splitsInfo.getPredicate()); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java index 6367d2ad086010..4bc80721a872b5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java @@ -52,6 +52,7 @@ import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableScan; @@ -274,7 +275,8 @@ public InternalRow next() { @Test public void testGetRemoteFileInfos(@Mocked FileStoreTable paimonNativeTable, - @Mocked ReadBuilder readBuilder) + @Mocked ReadBuilder readBuilder, + @Mocked InnerTableScan scan) throws Catalog.TableNotExistException { new MockUp() { @Mock @@ -290,6 +292,8 @@ public long getTableCreateTime(String dbName, String tblName) { result = readBuilder; readBuilder.withFilter((List) any).withProjection((int[]) any).newScan().plan().splits(); result = splits; + readBuilder.newScan(); + result = scan; } }; PaimonTable paimonTable = (PaimonTable) metadata.getTable("db1", "tbl1"); @@ -426,7 +430,7 @@ public List getRemoteFileInfos(String catalogName, Table table, long snapshotId, ScalarOperator predicate, List fieldNames, long limit) { return Lists.newArrayList(RemoteFileInfo.builder() - .setFiles(Lists.newArrayList(PaimonRemoteFileDesc.createPamonRemoteFileDesc( + .setFiles(Lists.newArrayList(PaimonRemoteFileDesc.createPaimonRemoteFileDesc( new PaimonSplitsInfo(null, Lists.newArrayList((Split) splits.get(0)))))) .build()); }