PR changed-lines coverage: 45.01% (658/1462, 0 noise lines excluded) Uncovered changed code (with context): ================================================================================ src/Databases/DataLake/GlueCatalog.cpp ================================================================================ --- uncovered block 545-548 --- 543 | storage_settings->loadFromSettingsChanges(settings.allChanged()); 544 | >> 545 | try >> 546 | { >> 547 | auto [metadata_version, metadata_path, compression_method] = DB::Iceberg::getLatestOrExplicitMetadataFileAndVersion( >> 548 | object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None); 549 | 550 | LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location); --- uncovered block 550-550 --- 548 | object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None); 549 | >> 550 | LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location); 551 | 552 | return "s3://" + bucket_name + "/" + metadata_path; ================================================================================ src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp ================================================================================ --- uncovered block 377-379 --- 375 | 376 | ObjectMetadata ReadBufferFromAzureBlobStorage::getObjectMetadataFromTheLastRequest() const >> 377 | { >> 378 | if (!last_object_metadata.get()->has_value()) >> 379 | throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests"); 380 | 381 | return last_object_metadata.get()->value(); --- uncovered block 381-381 --- 379 | throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests"); 380 | >> 381 | return last_object_metadata.get()->value(); 382 | } 383 | ================================================================================ src/Interpreters/IcebergMetadataLog.cpp ================================================================================ --- uncovered block 107-113 --- 105 | DB::IcebergMetadataLogElement{ 106 | .current_time = spec.tv_sec, >> 107 | .query_id = local_context->getCurrentQueryId(), >> 108 | .content_type = row_log_level, >> 109 | .table_path = table_path, >> 110 | .file_path = file_path.serialize(), >> 111 | .metadata_content = row, >> 112 | .row_in_file = row_in_file, >> 113 | .pruning_status = pruning_status}); 114 | } 115 | } ================================================================================ src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp ================================================================================ --- uncovered block 251-251 --- 249 | /*equality_ids*/ std::nullopt, 250 | sort_order_id); >> 251 | } 252 | case FileContentType::POSITION_DELETE: { 253 | /// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests --- uncovered block 261-267 --- 259 | Field reference_file_path_field = getValueFromRowByName(row_index, c_data_file_referenced_data_file); 260 | if (!reference_file_path_field.isNull()) >> 261 | { >> 262 | lower_reference_data_file_path.emplace( >> 263 | Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet())); >> 264 | upper_reference_data_file_path.emplace( >> 265 | Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet())); >> 266 | bounds_set_by_referenced_data_file = true; >> 267 | } 268 | } 269 | if (!bounds_set_by_referenced_data_file) ================================================================================ src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp ================================================================================ --- uncovered block 44-45 --- 42 | explicit ManifestFilePlan(Poco::JSON::Array::Ptr schema_) 43 | : statistics(schema_) >> 44 | { >> 45 | } 46 | 47 | Iceberg::IcebergPathFromMetadata path; --- uncovered block 120-120 --- 118 | CompressionMethod compression_method) 119 | { >> 120 | LoggerPtr log = getLogger("IcebergCompaction::getPlan"); 121 | 122 | Plan plan; --- uncovered block 122-123 --- 120 | LoggerPtr log = getLogger("IcebergCompaction::getPlan"); 121 | >> 122 | Plan plan; >> 123 | plan.generator = FileNamesGenerator(persistent_table_components.path_resolver.getTableLocation(), false, compression_method, write_format); 124 | 125 | const auto [metadata_version, metadata_file_path, _] = getLatestOrExplicitMetadataFileAndVersion( --- uncovered block 125-126 --- 123 | plan.generator = FileNamesGenerator(persistent_table_components.path_resolver.getTableLocation(), false, compression_method, write_format); 124 | >> 125 | const auto [metadata_version, metadata_file_path, _] = getLatestOrExplicitMetadataFileAndVersion( >> 126 | object_storage, 127 | persistent_table_components.table_path, 128 | data_lake_settings, --- uncovered block 129-133 --- 127 | persistent_table_components.table_path, 128 | data_lake_settings, >> 129 | persistent_table_components.metadata_cache, >> 130 | context, >> 131 | log.get(), >> 132 | persistent_table_components.table_uuid, >> 133 | persistent_table_components.metadata_compression_method); 134 | 135 | Poco::JSON::Object::Ptr initial_metadata_object --- uncovered block 135-136 --- 133 | persistent_table_components.metadata_compression_method); 134 | >> 135 | Poco::JSON::Object::Ptr initial_metadata_object >> 136 | = getMetadataJSONObject(metadata_file_path, object_storage, persistent_table_components.metadata_cache, context, log, compression_method, persistent_table_components.table_uuid); 137 | 138 | if (initial_metadata_object->getValue(Iceberg::f_format_version) < 2) --- uncovered block 152-152 --- 150 | } 151 | } >> 152 | plan.initial_metadata_object = initial_metadata_object; 153 | 154 | std::vector all_positional_delete_files; --- uncovered block 154-158 --- 152 | plan.initial_metadata_object = initial_metadata_object; 153 | >> 154 | std::vector all_positional_delete_files; >> 155 | std::unordered_map> manifest_files; >> 156 | for (const auto & snapshot : snapshots_info) >> 157 | { >> 158 | auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log); 159 | for (const auto & manifest_file : manifest_list) 160 | { --- uncovered block 179-180 --- 177 | { 178 | auto partition_index = plan.partition_encoder.encodePartition(data_file->parsed_entry->partition_key_value); >> 179 | if (plan.partitions.size() <= partition_index) >> 180 | plan.partitions.push_back({}); 181 | 182 | IcebergDataObjectInfoPtr data_object_info = std::make_shared( --- uncovered block 182-186 --- 180 | plan.partitions.push_back({}); 181 | >> 182 | IcebergDataObjectInfoPtr data_object_info = std::make_shared( >> 183 | data_file, persistent_table_components.path_resolver.resolve(data_file->parsed_entry->file_path_key), 0); >> 184 | std::shared_ptr data_file_ptr; >> 185 | if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path)) >> 186 | { 187 | data_file_ptr = std::make_shared(DataFilePlan{ 188 | .data_object_info = data_object_info, --- uncovered block 210-217 --- 208 | 209 | std::vector result_delete_files; >> 210 | for (auto & data_file : plan.partitions[partition_index]) >> 211 | { >> 212 | if (data_file->data_object_info->info.sequence_number <= delete_file->sequence_number) >> 213 | data_file->data_object_info->addPositionDeleteObject( >> 214 | delete_file, persistent_table_components.path_resolver.resolve(delete_file->parsed_entry->file_path_key)); >> 215 | } >> 216 | } >> 217 | plan.history = std::move(snapshots_info); 218 | plan.need_optimize = !all_positional_delete_files.empty(); 219 | return plan; --- uncovered block 262-262 --- 260 | true /* is_remote_fs */, 261 | chooseCompressionMethod(data_file->data_object_info->getPath(), toContentEncodingName(write_compression_method)), >> 262 | false); 263 | 264 | auto write_buffer = object_storage->writeObject( --- uncovered block 264-268 --- 262 | false); 263 | >> 264 | auto write_buffer = object_storage->writeObject( >> 265 | StoredObject(path_resolver.resolve(data_file->patched_path)), >> 266 | WriteMode::Rewrite, >> 267 | std::nullopt, >> 268 | DBMS_DEFAULT_BUFFER_SIZE, 269 | context->getWriteSettings()); 270 | --- uncovered block 296-296 --- 294 | write_buffer->finalize(); 295 | } >> 296 | } 297 | 298 | void writeMetadataFiles( --- uncovered block 300-301 --- 298 | void writeMetadataFiles( 299 | Plan & plan, const IcebergPathResolver & path_resolver, ObjectStoragePtr object_storage, ContextPtr context, SharedHeader sample_block_, String write_format, String table_path) >> 300 | { >> 301 | auto log = getLogger("IcebergCompaction"); 302 | 303 | ColumnsDescription columns_description = ColumnsDescription::fromNamesAndTypes(sample_block_->getNamesAndTypes()); --- uncovered block 318-321 --- 316 | } 317 | >> 318 | MetadataGenerator metadata_generator(metadata_object); >> 319 | std::vector new_snapshots; >> 320 | auto generated_metadata_info = plan.generator.generateMetadataPathWithInfo(); >> 321 | std::unordered_map snapshot_id_to_snapshot; 322 | 323 | std::unordered_map snapshot_id_to_records_count; --- uncovered block 323-323 --- 321 | std::unordered_map snapshot_id_to_snapshot; 322 | >> 323 | std::unordered_map snapshot_id_to_records_count; 324 | 325 | for (const auto & history_record : plan.history) --- uncovered block 336-341 --- 334 | total_records_count += data_file->new_records_count; 335 | >> 336 | auto new_snapshot = metadata_generator.generateNextMetadata( >> 337 | plan.generator, >> 338 | generated_metadata_info.path, >> 339 | history_record.parent_id, >> 340 | history_record.added_files, >> 341 | total_records_count, 342 | history_record.added_files_size, 343 | history_record.num_partitions, --- uncovered block 351-351 --- 349 | new_snapshots.push_back(new_snapshot); 350 | snapshot_id_to_snapshot[history_record.snapshot_id] = new_snapshot.snapshot; >> 351 | } 352 | 353 | Poco::JSON::Object::Ptr initial_metadata_object = plan.initial_metadata_object; --- uncovered block 353-355 --- 351 | } 352 | >> 353 | Poco::JSON::Object::Ptr initial_metadata_object = plan.initial_metadata_object; >> 354 | std::unordered_map manifest_file_renamings; >> 355 | std::unordered_map manifest_file_sizes; 356 | 357 | { --- uncovered block 357-360 --- 355 | std::unordered_map manifest_file_sizes; 356 | >> 357 | { >> 358 | std::unordered_map, std::unordered_set> grouped_by_manifest_files_result; >> 359 | std::unordered_map, size_t> grouped_by_manifest_files_partitions; >> 360 | std::unordered_map, size_t> partition_values; 361 | 362 | for (size_t i = 0; i < plan.partitions.size(); ++i) --- uncovered block 365-371 --- 363 | { 364 | const auto & partition = plan.partitions[i]; >> 365 | for (const auto & data_file : partition) >> 366 | { >> 367 | grouped_by_manifest_files_partitions[data_file->manifest_list] = i; >> 368 | grouped_by_manifest_files_result[data_file->manifest_list].insert(data_file->patched_path); >> 369 | partition_values[data_file->manifest_list] = i; >> 370 | } >> 371 | } 372 | 373 | auto partition_spec_id = initial_metadata_object->getValue(f_default_spec_id); --- uncovered block 394-402 --- 392 | } 393 | >> 394 | for (auto & [manifest_entry, data_filenames] : grouped_by_manifest_files_result) >> 395 | { >> 396 | manifest_entry->patched_path = plan.generator.generateManifestEntryName(); >> 397 | manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path; >> 398 | auto buffer_manifest_entry = object_storage->writeObject( >> 399 | StoredObject(path_resolver.resolve(manifest_entry->patched_path)), >> 400 | WriteMode::Rewrite, >> 401 | std::nullopt, >> 402 | DBMS_DEFAULT_BUFFER_SIZE, 403 | context->getWriteSettings()); 404 | --- uncovered block 422-423 --- 420 | partititon_spec, 421 | partition_spec_id, >> 422 | *buffer_manifest_entry, >> 423 | Iceberg::FileContentType::DATA); 424 | 425 | buffer_manifest_entry->finalize(); --- uncovered block 425-435 --- 423 | Iceberg::FileContentType::DATA); 424 | >> 425 | buffer_manifest_entry->finalize(); >> 426 | auto manifest_bytes = buffer_manifest_entry->count(); >> 427 | if (manifest_bytes == 0) >> 428 | { >> 429 | auto file_metadata = object_storage->getObjectMetadata( >> 430 | path_resolver.resolve(manifest_entry->patched_path), /*with_tags=*/ false); >> 431 | manifest_bytes = file_metadata.size_bytes; >> 432 | } >> 433 | manifest_file_sizes[manifest_entry->patched_path] += manifest_bytes; >> 434 | } >> 435 | } 436 | 437 | std::unordered_map manifest_list_renamings; --- uncovered block 437-441 --- 435 | } 436 | >> 437 | std::unordered_map manifest_list_renamings; >> 438 | for (size_t i = 0; i < plan.history.size(); ++i) >> 439 | { >> 440 | if (plan.history[i].added_files == 0) >> 441 | continue; 442 | 443 | manifest_list_renamings[plan.history[i].manifest_list_path] = new_snapshots[i].manifest_list_path; --- uncovered block 443-444 --- 441 | continue; 442 | >> 443 | manifest_list_renamings[plan.history[i].manifest_list_path] = new_snapshots[i].manifest_list_path; >> 444 | } 445 | 446 | for (size_t i = 0; i < plan.history.size(); ++i) --- uncovered block 446-446 --- 444 | } 445 | >> 446 | for (size_t i = 0; i < plan.history.size(); ++i) 447 | { 448 | if (plan.history[i].added_files == 0) --- uncovered block 451-482 --- 449 | continue; 450 | >> 451 | auto initial_manifest_list_name = plan.history[i].manifest_list_path; >> 452 | auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name]; >> 453 | auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name]; >> 454 | std::vector renamed_manifest_entries; >> 455 | for (const auto & initial_manifest_entry : initial_manifest_entries) >> 456 | { >> 457 | auto renamed_manifest_entry = manifest_file_renamings[initial_manifest_entry]; >> 458 | if (!renamed_manifest_entry.empty()) >> 459 | { >> 460 | renamed_manifest_entries.push_back(renamed_manifest_entry); >> 461 | } >> 462 | } >> 463 | std::vector per_manifest_sizes; >> 464 | for (const auto & entry : renamed_manifest_entries) >> 465 | per_manifest_sizes.push_back(manifest_file_sizes[entry]); >> 466 | auto buffer_manifest_list = object_storage->writeObject( >> 467 | StoredObject(path_resolver.resolve(renamed_manifest_list)), >> 468 | WriteMode::Rewrite, >> 469 | std::nullopt, >> 470 | DBMS_DEFAULT_BUFFER_SIZE, >> 471 | context->getWriteSettings()); >> 472 | generateManifestList( >> 473 | path_resolver, >> 474 | metadata_object, >> 475 | object_storage, >> 476 | context, >> 477 | renamed_manifest_entries, >> 478 | new_snapshots[i].snapshot, >> 479 | per_manifest_sizes, >> 480 | *buffer_manifest_list, >> 481 | Iceberg::FileContentType::DATA, >> 482 | false); 483 | buffer_manifest_list->finalize(); 484 | } --- uncovered block 489-489 --- 487 | std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM 488 | Poco::JSON::Stringifier::stringify(metadata_object, oss, 4); >> 489 | std::string json_representation = removeEscapedSlashes(oss.str()); 490 | 491 | auto buffer_metadata = object_storage->writeObject( --- uncovered block 491-495 --- 489 | std::string json_representation = removeEscapedSlashes(oss.str()); 490 | >> 491 | auto buffer_metadata = object_storage->writeObject( >> 492 | StoredObject(path_resolver.resolve(generated_metadata_info.path)), >> 493 | WriteMode::Rewrite, >> 494 | std::nullopt, >> 495 | DBMS_DEFAULT_BUFFER_SIZE, 496 | context->getWriteSettings()); 497 | --- uncovered block 544-555 --- 542 | auto old_files = getOldFiles(object_storage_, persistent_table_components.table_path); 543 | writeDataFiles( >> 544 | plan, >> 545 | sample_block_, >> 546 | object_storage_, >> 547 | persistent_table_components.path_resolver, >> 548 | format_settings_, >> 549 | context_, >> 550 | write_format, >> 551 | persistent_table_components.metadata_compression_method); >> 552 | writeMetadataFiles(plan, persistent_table_components.path_resolver, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path); >> 553 | clearOldFiles(object_storage_, old_files); >> 554 | } >> 555 | } 556 | 557 | } ================================================================================ src/Storages/ObjectStorage/DataLakes/Iceberg/FileNamesGenerator.cpp ================================================================================ --- uncovered block 27-32 --- 25 | 26 | FileNamesGenerator::FileNamesGenerator(const FileNamesGenerator & other) >> 27 | { >> 28 | initial_version = other.initial_version; >> 29 | table_location = other.table_location; >> 30 | use_uuid_in_metadata = other.use_uuid_in_metadata; >> 31 | compression_method = other.compression_method; >> 32 | format_name = other.format_name; 33 | } 34 | --- uncovered block 38-38 --- 36 | { 37 | if (this == &other) >> 38 | return *this; 39 | 40 | initial_version = other.initial_version; --- uncovered block 71-71 --- 69 | auto compression_suffix = toContentEncodingName(compression_method); 70 | if (!compression_suffix.empty()) >> 71 | compression_suffix = "." + compression_suffix; 72 | auto used_version = initial_version++; 73 | if (!use_uuid_in_metadata) --- uncovered block 81-89 --- 79 | .compression_method = compression_method}; 80 | } >> 81 | else >> 82 | { >> 83 | auto uuid_str = uuid_generator.createRandom().toString(); >> 84 | return GeneratedMetadataFileWithInfo{ >> 85 | .path = Iceberg::IcebergPathFromMetadata( >> 86 | fmt::format("{}metadata/v{}-{}{}.metadata.json", table_location, used_version, uuid_str, compression_suffix)), >> 87 | .version = used_version, >> 88 | .compression_method = compression_method}; >> 89 | } 90 | } 91 | --- uncovered block 98-101 --- 96 | 97 | Iceberg::IcebergPathFromMetadata FileNamesGenerator::generatePositionDeleteFile() >> 98 | { >> 99 | auto uuid_str = uuid_generator.createRandom().toString(); >> 100 | return Iceberg::IcebergPathFromMetadata(fmt::format("{}data/{}-deletes.{}", table_location, uuid_str, format_name)); >> 101 | } 102 | 103 | } ================================================================================ src/Storages/ObjectStorage/DataLakes/Iceberg/FileNamesGenerator.h ================================================================================ --- uncovered block 52-52 --- 50 | 51 | void setVersion(Int32 initial_version_) { initial_version = initial_version_; } >> 52 | void setCompressionMethod(CompressionMethod compression_method_) { compression_method = compression_method_; } 53 | 54 | private: ================================================================================ src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp ================================================================================ --- uncovered block 66-66 --- 64 | return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); 65 | else >> 66 | return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); 67 | } 68 | --- uncovered block 72-72 --- 70 | { 71 | if (Poco::toUpper(info.file_format) != "PARQUET") >> 72 | { 73 | throw Exception( 74 | ErrorCodes::NOT_IMPLEMENTED, --- uncovered block 76-77 --- 74 | ErrorCodes::NOT_IMPLEMENTED, 75 | "Position deletes are only supported for data files of Parquet format in Iceberg, but got {}", >> 76 | info.file_format); >> 77 | } 78 | info.position_deletes_objects.emplace_back( 79 | resolved_storage_path, position_delete_object->parsed_entry->file_format, std::nullopt); WARNING: Failed to get start time for [Print Uncovered Code] - start time and duration won't be set