Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
update/update_statistics.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
29 changes: 29 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ constexpr std::string_view kActionSetSnapshotRef = "set-snapshot-ref";
constexpr std::string_view kActionSetProperties = "set-properties";
constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";

// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
Expand Down Expand Up @@ -1399,6 +1401,22 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kLocation] = u.location();
break;
}
case TableUpdate::Kind::kSetStatistics: {
const auto& u = internal::checked_cast<const table::SetStatistics&>(update);
json[kAction] = kActionSetStatistics;
if (u.statistics_file()) {
json[kStatistics] = ToJson(*u.statistics_file());
} else {
json[kStatistics] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemoveStatistics: {
const auto& u = internal::checked_cast<const table::RemoveStatistics&>(update);
json[kAction] = kActionRemoveStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
}
return json;
}
Expand Down Expand Up @@ -1579,6 +1597,17 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue<std::string>(json, kLocation));
return std::make_unique<table::SetLocation>(std::move(location));
}
if (action == kActionSetStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto statistics_json,
GetJsonValue<nlohmann::json>(json, kStatistics));
ICEBERG_ASSIGN_OR_RAISE(auto statistics_file,
StatisticsFileFromJson(statistics_json));
return std::make_unique<table::SetStatistics>(std::move(statistics_file));
}
if (action == kActionRemoveStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemoveStatistics>(snapshot_id);
}

return JsonParseError("Unknown table update action: {}", action);
}
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ iceberg_sources = files(
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_sort_order.cc',
'update/update_statistics.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
'util/conversions.cc',
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_statistics.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -199,6 +200,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdateStatistics();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();

/// \brief Create a new UpdateStatistics to update the table statistics and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();

/// \brief Create a new UpdateLocation to update the table location and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
Expand Down
43 changes: 40 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_update.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -620,6 +621,8 @@ class TableMetadataBuilder::Impl {
Status RemoveRef(const std::string& name);
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
Status RemoveStatistics(int64_t snapshot_id);

Result<std::unique_ptr<TableMetadata>> Build();

Expand Down Expand Up @@ -1173,6 +1176,38 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
return {};
}

Status TableMetadataBuilder::Impl::SetStatistics(
std::shared_ptr<StatisticsFile> statistics_file) {
ICEBERG_PRECHECK(statistics_file != nullptr, "Cannot set null statistics file");

// Find and replace existing statistics for the same snapshot_id, or add new one
auto it = std::ranges::find_if(
metadata_.statistics,
[snapshot_id = statistics_file->snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});

if (it != metadata_.statistics.end()) {
*it = statistics_file;
} else {
metadata_.statistics.push_back(statistics_file);
}

changes_.push_back(std::make_unique<table::SetStatistics>(std::move(statistics_file)));
return {};
}

Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
auto removed_count =
std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});
if (removed_count != 0) {
changes_.push_back(std::make_unique<table::RemoveStatistics>(snapshot_id));
}
return {};
}

std::unordered_set<int64_t> TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
Expand Down Expand Up @@ -1589,12 +1624,14 @@ TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {
}

TableMetadataBuilder& TableMetadataBuilder::SetStatistics(
const std::shared_ptr<StatisticsFile>& statistics_file) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
std::shared_ptr<StatisticsFile> statistics_file) {
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(std::move(statistics_file)));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
///
/// \param statistics_file The statistics file to set
/// \return Reference to this builder for method chaining
TableMetadataBuilder& SetStatistics(
const std::shared_ptr<StatisticsFile>& statistics_file);
TableMetadataBuilder& SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);

/// \brief Remove table statistics by snapshot ID
///
Expand Down
80 changes: 67 additions & 13 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "iceberg/exception.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirements.h"
#include "iceberg/util/checked_cast.h"

namespace iceberg {
TableUpdate::~TableUpdate() = default;
Expand All @@ -45,7 +47,7 @@ bool AssignUUID::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kAssignUUID) {
return false;
}
const auto& other_assign = static_cast<const AssignUUID&>(other);
const auto& other_assign = internal::checked_cast<const AssignUUID&>(other);
return uuid_ == other_assign.uuid_;
}

Expand All @@ -67,7 +69,7 @@ bool UpgradeFormatVersion::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kUpgradeFormatVersion) {
return false;
}
const auto& other_upgrade = static_cast<const UpgradeFormatVersion&>(other);
const auto& other_upgrade = internal::checked_cast<const UpgradeFormatVersion&>(other);
return format_version_ == other_upgrade.format_version_;
}

Expand Down Expand Up @@ -117,7 +119,7 @@ bool SetCurrentSchema::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetCurrentSchema) {
return false;
}
const auto& other_set = static_cast<const SetCurrentSchema&>(other);
const auto& other_set = internal::checked_cast<const SetCurrentSchema&>(other);
return schema_id_ == other_set.schema_id_;
}

Expand Down Expand Up @@ -167,7 +169,7 @@ bool SetDefaultPartitionSpec::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetDefaultPartitionSpec) {
return false;
}
const auto& other_set = static_cast<const SetDefaultPartitionSpec&>(other);
const auto& other_set = internal::checked_cast<const SetDefaultPartitionSpec&>(other);
return spec_id_ == other_set.spec_id_;
}

Expand All @@ -190,7 +192,7 @@ bool RemovePartitionSpecs::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemovePartitionSpecs) {
return false;
}
const auto& other_remove = static_cast<const RemovePartitionSpecs&>(other);
const auto& other_remove = internal::checked_cast<const RemovePartitionSpecs&>(other);
return spec_ids_ == other_remove.spec_ids_;
}

Expand All @@ -213,7 +215,7 @@ bool RemoveSchemas::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveSchemas) {
return false;
}
const auto& other_remove = static_cast<const RemoveSchemas&>(other);
const auto& other_remove = internal::checked_cast<const RemoveSchemas&>(other);
return schema_ids_ == other_remove.schema_ids_;
}

Expand Down Expand Up @@ -263,7 +265,7 @@ bool SetDefaultSortOrder::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetDefaultSortOrder) {
return false;
}
const auto& other_set = static_cast<const SetDefaultSortOrder&>(other);
const auto& other_set = internal::checked_cast<const SetDefaultSortOrder&>(other);
return sort_order_id_ == other_set.sort_order_id_;
}

Expand Down Expand Up @@ -313,7 +315,7 @@ bool RemoveSnapshots::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveSnapshots) {
return false;
}
const auto& other_remove = static_cast<const RemoveSnapshots&>(other);
const auto& other_remove = internal::checked_cast<const RemoveSnapshots&>(other);
return snapshot_ids_ == other_remove.snapshot_ids_;
}

Expand All @@ -335,7 +337,7 @@ bool RemoveSnapshotRef::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveSnapshotRef) {
return false;
}
const auto& other_remove = static_cast<const RemoveSnapshotRef&>(other);
const auto& other_remove = internal::checked_cast<const RemoveSnapshotRef&>(other);
return ref_name_ == other_remove.ref_name_;
}

Expand Down Expand Up @@ -366,7 +368,7 @@ bool SetSnapshotRef::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetSnapshotRef) {
return false;
}
const auto& other_set = static_cast<const SetSnapshotRef&>(other);
const auto& other_set = internal::checked_cast<const SetSnapshotRef&>(other);
return ref_name_ == other_set.ref_name_ && snapshot_id_ == other_set.snapshot_id_ &&
type_ == other_set.type_ &&
min_snapshots_to_keep_ == other_set.min_snapshots_to_keep_ &&
Expand Down Expand Up @@ -394,7 +396,7 @@ bool SetProperties::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetProperties) {
return false;
}
const auto& other_set = static_cast<const SetProperties&>(other);
const auto& other_set = internal::checked_cast<const SetProperties&>(other);
return updated_ == other_set.updated_;
}

Expand All @@ -416,7 +418,7 @@ bool RemoveProperties::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveProperties) {
return false;
}
const auto& other_remove = static_cast<const RemoveProperties&>(other);
const auto& other_remove = internal::checked_cast<const RemoveProperties&>(other);
return removed_ == other_remove.removed_;
}

Expand All @@ -438,12 +440,64 @@ bool SetLocation::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetLocation) {
return false;
}
const auto& other_set = static_cast<const SetLocation&>(other);
const auto& other_set = internal::checked_cast<const SetLocation&>(other);
return location_ == other_set.location_;
}

std::unique_ptr<TableUpdate> SetLocation::Clone() const {
return std::make_unique<SetLocation>(location_);
}

// SetStatistics

int64_t SetStatistics::snapshot_id() const { return statistics_file_->snapshot_id; }

void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetStatistics(statistics_file_);
}

void SetStatistics::GenerateRequirements(TableUpdateContext& context) const {
// SetStatistics doesn't generate any requirements
}

bool SetStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetStatistics) {
return false;
}
const auto& other_set = internal::checked_cast<const SetStatistics&>(other);
if (!statistics_file_ != !other_set.statistics_file_) {
return false;
}
if (statistics_file_ && !(*statistics_file_ == *other_set.statistics_file_)) {
return false;
}
return true;
}

std::unique_ptr<TableUpdate> SetStatistics::Clone() const {
return std::make_unique<SetStatistics>(statistics_file_);
}

// RemoveStatistics

void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemoveStatistics(snapshot_id_);
}

void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const {
// RemoveStatistics doesn't generate any requirements
}

bool RemoveStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveStatistics) {
return false;
}
const auto& other_remove = internal::checked_cast<const RemoveStatistics&>(other);
return snapshot_id_ == other_remove.snapshot_id_;
}

std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
return std::make_unique<RemoveStatistics>(snapshot_id_);
}

} // namespace iceberg::table
Loading
Loading