74.48% (1643/2206) Uncovered changed code (with context): ================================================================================ src/Interpreters/InterpreterSystemQuery.cpp ================================================================================ --- uncovered block 884-886 --- 882 | syncSelectiveMigrations(query); 883 | break; >> 884 | case Type::REPLICA_UNREADY: >> 885 | throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented"); >> 886 | case Type::REPLICA_READY: 887 | throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented"); 888 | case Type::SYNC_TRANSACTION_LOG: --- uncovered block 2172-2172 --- 2170 | void InterpreterSystemQuery::syncTransactionLog() 2171 | { >> 2172 | getContext()->checkTransactionsAreAllowed(/* explicit_tcl_query */ true); 2173 | TransactionLog::instance().sync(); 2174 | } --- uncovered block 2463-2464 --- 2461 | required_access.emplace_back(AccessType::SYSTEM_REDUCE_BLOCKING_PARTS); 2462 | else >> 2463 | required_access.emplace_back(AccessType::SYSTEM_REDUCE_BLOCKING_PARTS, query.getDatabase(), query.getTable()); >> 2464 | break; 2465 | } 2466 | case Type::START_SELECTIVE_REBALANCE: --- uncovered block 2466-2466 --- 2464 | break; 2465 | } >> 2466 | case Type::START_SELECTIVE_REBALANCE: 2467 | { 2468 | required_access.emplace_back(AccessType::SYSTEM_START_SELECTIVE_REBALANCE, query.getDatabase(), query.getTable()); --- uncovered block 2468-2469 --- 2466 | case Type::START_SELECTIVE_REBALANCE: 2467 | { >> 2468 | required_access.emplace_back(AccessType::SYSTEM_START_SELECTIVE_REBALANCE, query.getDatabase(), query.getTable()); >> 2469 | break; 2470 | } 2471 | case Type::SYNC_SELECTIVE_MIGRATIONS: --- uncovered block 2471-2471 --- 2469 | break; 2470 | } >> 2471 | case Type::SYNC_SELECTIVE_MIGRATIONS: 2472 | { 2473 | required_access.emplace_back(AccessType::SYSTEM_SYNC_SELECTIVE_MIGRATIONS, query.getDatabase(), query.getTable()); --- uncovered block 2473-2474 --- 2471 | case Type::SYNC_SELECTIVE_MIGRATIONS: 2472 | { >> 2473 | required_access.emplace_back(AccessType::SYSTEM_SYNC_SELECTIVE_MIGRATIONS, query.getDatabase(), query.getTable()); >> 2474 | break; 2475 | } 2476 | case Type::REFRESH_VIEW: --- uncovered block 2476-2478 --- 2474 | break; 2475 | } >> 2476 | case Type::REFRESH_VIEW: >> 2477 | case Type::WAIT_VIEW: >> 2478 | case Type::START_VIEW: 2479 | case Type::START_VIEWS: 2480 | case Type::START_REPLICATED_VIEW: ================================================================================ src/Parsers/ParserSystemQuery.cpp ================================================================================ --- uncovered block 586-586 --- 584 | case Type::SYNC_SELECTIVE_MIGRATIONS: 585 | if (!parseQueryWithOnCluster(res, pos, expected)) >> 586 | return false; 587 | if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table)) 588 | return false; --- uncovered block 588-588 --- 586 | return false; 587 | if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table)) >> 588 | return false; 589 | break; 590 | ================================================================================ src/Processors/QueryPlan/ReadFromRemote.cpp ================================================================================ --- uncovered block 194-194 --- 192 | { 193 | if (!filter) >> 194 | return; 195 | auto & select_query = query->as(); 196 | auto where_expression = select_query.where(); --- uncovered block 771-771 --- 769 | remote_query_executor->setPoolMode(PoolMode::GET_ONE); 770 | else >> 771 | remote_query_executor->setPoolMode(PoolMode::GET_MANY); 772 | 773 | if (!table_func_ptr) ================================================================================ src/Storages/MergeTree/KeeperReplicaAssignment.cpp ================================================================================ --- uncovered block 44-44 --- 42 | const String & partition_id) const 43 | { >> 44 | String assignment_path = fs::path(zookeeper_path) / "selective" / "assignments" / partition_id; 45 | 46 | String data; --- uncovered block 46-48 --- 44 | String assignment_path = fs::path(zookeeper_path) / "selective" / "assignments" / partition_id; 45 | >> 46 | String data; >> 47 | Coordination::Stat stat; >> 48 | Coordination::Error code; 49 | 50 | bool ok = zk->tryGet(assignment_path, data, &stat, nullptr, &code); --- uncovered block 50-51 --- 48 | Coordination::Error code; 49 | >> 50 | bool ok = zk->tryGet(assignment_path, data, &stat, nullptr, &code); >> 51 | if (ok) 52 | { 53 | Strings assigned = parseAssignment(data); --- uncovered block 53-56 --- 51 | if (ok) 52 | { >> 53 | Strings assigned = parseAssignment(data); >> 54 | if (!assigned.empty()) >> 55 | return assigned; >> 56 | return {}; 57 | } 58 | --- uncovered block 59-60 --- 57 | } 58 | >> 59 | if (code == Coordination::Error::ZNONODE) >> 60 | return {}; 61 | 62 | /// Any other ZK error — propagate to let the caller decide on retry/fallback. --- uncovered block 63-63 --- 61 | 62 | /// Any other ZK error — propagate to let the caller decide on retry/fallback. >> 63 | throw Coordination::Exception::fromPath(code, assignment_path); 64 | } 65 | --- uncovered block 73-73 --- 71 | { 72 | if (all_replicas.empty()) >> 73 | return {}; 74 | 75 | fiu_do_on(FailPoints::selective_replication_assignment_lookup_fail, --- uncovered block 96-96 --- 94 | else if (get_code != Coordination::Error::ZNONODE) 95 | { >> 96 | throw Coordination::Exception::fromPath(get_code, assignment_path); 97 | } 98 | --- uncovered block 153-154 --- 151 | return assigned; 152 | >> 153 | if (code != Coordination::Error::ZNODEEXISTS) >> 154 | throw Coordination::Exception::fromPath(code, assignment_path); 155 | 156 | /// Another replica won the CAS — read its assignment. --- uncovered block 157-159 --- 155 | 156 | /// Another replica won the CAS — read its assignment. >> 157 | String winner_data; >> 158 | Coordination::Stat winner_stat; >> 159 | if (zk->tryGet(assignment_path, winner_data, &winner_stat) && !winner_data.empty()) 160 | { 161 | Strings winner_assigned = parseAssignment(winner_data); --- uncovered block 161-163 --- 159 | if (zk->tryGet(assignment_path, winner_data, &winner_stat) && !winner_data.empty()) 160 | { >> 161 | Strings winner_assigned = parseAssignment(winner_data); >> 162 | if (!winner_assigned.empty()) >> 163 | return winner_assigned; 164 | 165 | LOG_WARNING(getLogger("KeeperReplicaAssignment"), --- uncovered block 165-167 --- 163 | return winner_assigned; 164 | >> 165 | LOG_WARNING(getLogger("KeeperReplicaAssignment"), >> 166 | "Assignment node {} has empty/corrupt data after ZNODEEXISTS, using locally computed assignment", assignment_path); >> 167 | return pick_top_replicas(); 168 | } 169 | --- uncovered block 171-173 --- 169 | 170 | /// Node was deleted between tryCreate and tryGet — retry. >> 171 | LOG_DEBUG(getLogger("KeeperReplicaAssignment"), >> 172 | "Assignment node {} disappeared after ZNODEEXISTS (attempt {}/{}), retrying", >> 173 | assignment_path, attempt + 1, max_retries); 174 | } 175 | --- uncovered block 192-192 --- 190 | LOG_DEBUG(getLogger("KeeperReplicaAssignment"), "Assignment node {} already absent for dropped partition", assignment_path); 191 | else >> 192 | LOG_WARNING(getLogger("KeeperReplicaAssignment"), "Failed to remove assignment node {}: {}", assignment_path, Coordination::errorMessage(code)); 193 | } 194 | --- uncovered block 202-202 --- 200 | auto code = zk->tryGetChildren(assignments_path, children); 201 | if (code == Coordination::Error::ZNONODE) >> 202 | return {}; 203 | if (code != Coordination::Error::ZOK) 204 | { --- uncovered block 205-208 --- 203 | if (code != Coordination::Error::ZOK) 204 | { >> 205 | LOG_WARNING(getLogger("KeeperReplicaAssignment"), >> 206 | "Failed to list assignment children at {}: {}, treating as empty", >> 207 | assignments_path, Coordination::errorMessage(code)); >> 208 | return {}; 209 | } 210 | --- uncovered block 223-223 --- 221 | { 222 | if (results[i].error != Coordination::Error::ZOK) >> 223 | continue; 224 | Strings assigned = parseAssignment(results[i].data); 225 | if (!assigned.empty()) --- uncovered block 249-249 --- 247 | { 248 | if (data.empty()) >> 249 | return {}; 250 | 251 | ReadBufferFromString in(data); --- uncovered block 289-289 --- 287 | bool KeeperReplicaAssignment::hasCloningSuffix(const String & replica_name) 288 | { >> 289 | return replica_name.ends_with(SelectiveReplication::CLONING_SUFFIX); 290 | } 291 | --- uncovered block 294-294 --- 292 | Strings KeeperReplicaAssignment::getAssignedReplicasCached(const String & partition_id) const 293 | { >> 294 | std::lock_guard lock(cache_mutex); 295 | 296 | if (cached.empty()) --- uncovered block 296-297 --- 294 | std::lock_guard lock(cache_mutex); 295 | >> 296 | if (cached.empty()) >> 297 | return {}; 298 | 299 | auto now = std::chrono::steady_clock::now(); --- uncovered block 299-301 --- 297 | return {}; 298 | >> 299 | auto now = std::chrono::steady_clock::now(); >> 300 | if (std::chrono::duration_cast(now - cached.last_refresh) > CACHE_TTL) >> 301 | return {}; 302 | 303 | auto it = cached.partitions.find(partition_id); --- uncovered block 303-305 --- 301 | return {}; 302 | >> 303 | auto it = cached.partitions.find(partition_id); >> 304 | if (it != cached.partitions.end()) >> 305 | return it->second.replicas; 306 | 307 | return {}; --- uncovered block 307-307 --- 305 | return it->second.replicas; 306 | >> 307 | return {}; 308 | } 309 | --- uncovered block 335-335 --- 333 | 334 | if (cached.empty()) >> 335 | return std::nullopt; 336 | 337 | auto now = std::chrono::steady_clock::now(); --- uncovered block 362-362 --- 360 | if (!ok) 361 | { >> 362 | if (code == Coordination::Error::ZNONODE) 363 | { 364 | result.replicas = {}; --- uncovered block 364-366 --- 362 | if (code == Coordination::Error::ZNONODE) 363 | { >> 364 | result.replicas = {}; >> 365 | result.version = -1; >> 366 | return result; 367 | } 368 | --- uncovered block 370-370 --- 368 | 369 | /// Any other ZK error — propagate to let the caller decide on retry/fallback. >> 370 | throw Coordination::Exception::fromPath(code, path); 371 | } 372 | --- uncovered block 389-389 --- 387 | auto age = std::chrono::duration_cast(now - cached.last_refresh); 388 | if (age < CACHE_TTL) >> 389 | return; 390 | } 391 | } --- uncovered block 397-397 --- 395 | bool expected = false; 396 | if (!cache_refreshing.compare_exchange_strong(expected, true)) >> 397 | return; 398 | 399 | SCOPE_EXIT({ cache_refreshing.store(false); }); --- uncovered block 438-440 --- 436 | /// Fast path: cache hit — no ZK read needed. 437 | { >> 438 | std::lock_guard lock(cache_mutex); >> 439 | if (cached.partitions.contains(partition_id)) >> 440 | return; 441 | } 442 | --- uncovered block 444-448 --- 442 | 443 | /// Slow path: single ZK read outside the lock to avoid holding it during RPC. >> 444 | String assignment_path = fs::path(zookeeper_path) / "selective" / "assignments" / partition_id; >> 445 | String data; >> 446 | Coordination::Stat stat; >> 447 | Coordination::Error code; >> 448 | bool ok = zk->tryGet(assignment_path, data, &stat, nullptr, &code); 449 | 450 | std::lock_guard lock(cache_mutex); --- uncovered block 450-451 --- 448 | bool ok = zk->tryGet(assignment_path, data, &stat, nullptr, &code); 449 | >> 450 | std::lock_guard lock(cache_mutex); >> 451 | if (ok) 452 | { 453 | Strings assigned = parseAssignment(data); --- uncovered block 453-454 --- 451 | if (ok) 452 | { >> 453 | Strings assigned = parseAssignment(data); >> 454 | cached.partitions[partition_id] = CachedEntry{std::move(assigned), stat.version}; 455 | } 456 | else if (code == Coordination::Error::ZNONODE) --- uncovered block 456-456 --- 454 | cached.partitions[partition_id] = CachedEntry{std::move(assigned), stat.version}; 455 | } >> 456 | else if (code == Coordination::Error::ZNONODE) 457 | { 458 | /// Partition has no assignment node — record empty entry to avoid repeated ZK reads. --- uncovered block 459-459 --- 457 | { 458 | /// Partition has no assignment node — record empty entry to avoid repeated ZK reads. >> 459 | cached.partitions[partition_id] = CachedEntry{{}, -1}; 460 | } 461 | else --- uncovered block 464-464 --- 462 | { 463 | /// Any other ZK error (timeout, disconnected, etc.) — propagate so caller can retry. >> 464 | throw Coordination::Exception::fromPath(code, assignment_path); 465 | } 466 | cached.last_refresh = std::chrono::steady_clock::now(); --- uncovered block 466-466 --- 464 | throw Coordination::Exception::fromPath(code, assignment_path); 465 | } >> 466 | cached.last_refresh = std::chrono::steady_clock::now(); 467 | } 468 | --- uncovered block 473-473 --- 471 | { 472 | if (partition_ids.empty()) >> 473 | return; 474 | 475 | /// When the partition-cached failpoint is active, skip batch caching so that --- uncovered block 509-509 --- 507 | cached.partitions[missing[i]] = CachedEntry{std::move(assigned), results[i].stat.version}; 508 | } >> 509 | else if (results[i].error == Coordination::Error::ZNONODE) 510 | { 511 | cached.partitions[missing[i]] = CachedEntry{{}, -1}; --- uncovered block 511-511 --- 509 | else if (results[i].error == Coordination::Error::ZNONODE) 510 | { >> 511 | cached.partitions[missing[i]] = CachedEntry{{}, -1}; 512 | } 513 | else --- uncovered block 515-516 --- 513 | else 514 | { >> 515 | LOG_WARNING(log, "ensureBatchCached: ZK error for partition {}: {} — will retry via ensurePartitionCached if needed", >> 516 | missing[i], Coordination::errorMessage(results[i].error)); 517 | } 518 | } ================================================================================ src/Storages/MergeTree/PartitionMigrationCoordinator.cpp ================================================================================ --- uncovered block 78-78 --- 76 | { 77 | if (data.empty()) >> 78 | return false; 79 | 80 | try --- uncovered block 90-90 --- 88 | if (obj->has(key)) 89 | return obj->getValue(key); >> 90 | return {}; 91 | }; 92 | --- uncovered block 110-110 --- 108 | { 109 | /// Corrupt or incompatible JSON format - not an error for caller. >> 110 | return false; 111 | } 112 | } --- uncovered block 119-119 --- 117 | Coordination::Stat stat; 118 | if (!zk->tryGet(migration_path, data, &stat)) >> 119 | return false; 120 | 121 | if (data.empty()) --- uncovered block 122-122 --- 120 | 121 | if (data.empty()) >> 122 | return false; 123 | 124 | return deserialize(data, out); --- uncovered block 135-135 --- 133 | Coordination::Stat stat; 134 | if (!zk->tryGet(migration_path, data, &stat)) >> 135 | return false; 136 | 137 | MigrationMetadata meta; --- uncovered block 139-139 --- 137 | MigrationMetadata meta; 138 | if (!deserialize(data, meta)) >> 139 | return false; 140 | 141 | meta.state = new_state; --- uncovered block 160-160 --- 158 | MigrationMetadata meta; 159 | if (!MigrationMetadata::read(zk, migrations_path + "/" + mid, meta)) >> 160 | continue; 161 | if (meta.state == SelectiveReplication::MIGRATION_STATE_CLONE 162 | || meta.state == SelectiveReplication::MIGRATION_STATE_SWITCH) --- uncovered block 206-207 --- 204 | /// Early abort if ZK session has expired — no point retrying. 205 | if (zk->expired()) >> 206 | throw Exception(ErrorCodes::NO_ZOOKEEPER, >> 207 | "ZK session expired during CAS retry for partition {}", partition_id); 208 | 209 | String data; --- uncovered block 212-212 --- 210 | Coordination::Stat stat; 211 | if (!zk->tryGet(path, data, &stat)) >> 212 | return false; 213 | 214 | Strings replicas = KeeperReplicaAssignment::parseAssignment(data); --- uncovered block 217-217 --- 215 | auto new_replicas = modifier(std::move(replicas)); 216 | if (!new_replicas) >> 217 | return false; 218 | 219 | std::sort(new_replicas->begin(), new_replicas->end()); --- uncovered block 225-225 --- 223 | if (rc == Coordination::Error::ZOK) 224 | return true; >> 225 | if (rc == Coordination::Error::ZBADVERSION) 226 | { 227 | LOG_DEBUG(log, "Assignment CAS failed for partition {} (attempt {}/{}), retrying", --- uncovered block 227-229 --- 225 | if (rc == Coordination::Error::ZBADVERSION) 226 | { >> 227 | LOG_DEBUG(log, "Assignment CAS failed for partition {} (attempt {}/{}), retrying", >> 228 | partition_id, attempt + 1, SelectiveReplication::MAX_MIGRATION_CAS_RETRIES); >> 229 | continue; 230 | } 231 | throw Coordination::Exception::fromPath(rc, path); --- uncovered block 231-231 --- 229 | continue; 230 | } >> 231 | throw Coordination::Exception::fromPath(rc, path); 232 | } 233 | --- uncovered block 234-236 --- 232 | } 233 | >> 234 | throw Exception(ErrorCodes::UNFINISHED, >> 235 | "Failed to update assignment for partition {} after {} CAS retries", >> 236 | partition_id, SelectiveReplication::MAX_MIGRATION_CAS_RETRIES); 237 | } 238 | --- uncovered block 251-251 --- 249 | if (rc != Coordination::Error::ZOK) 250 | throw Exception(ErrorCodes::LOGICAL_ERROR, >> 251 | "Cannot read parts of replica '{}': {}", replica_name, Coordination::errorMessage(rc)); 252 | 253 | LOG_TRACE(log, "readActivePartsForPartition: path={}, replica={}, partition={}, all_parts_count={}", --- uncovered block 274-276 --- 272 | Strings PartitionMigrationCoordinator::parseSnapshot(const String & snapshot_str) 273 | { >> 274 | Strings parts; >> 275 | splitInto<'\n'>(parts, snapshot_str); >> 276 | return parts; 277 | } 278 | --- uncovered block 324-325 --- 322 | { 323 | if (zk->expired()) >> 324 | throw Exception(ErrorCodes::NO_ZOOKEEPER, >> 325 | "ZK session expired during startClone CAS for partition {}", partition_id); 326 | 327 | String asgn_data; --- uncovered block 331-331 --- 329 | if (!zk->tryGet(a_path, asgn_data, &asgn_stat)) 330 | throw Exception(ErrorCodes::LOGICAL_ERROR, >> 331 | "Assignment node missing for partition {} during startClone", partition_id); 332 | 333 | Strings replicas = KeeperReplicaAssignment::parseAssignment(asgn_data); --- uncovered block 352-352 --- 350 | 351 | /// If migration node already exists, another coordinator created it — skip init. >> 352 | if (!responses.empty() && responses[0]->error == Coordination::Error::ZNODEEXISTS) 353 | { 354 | LOG_INFO(log, "startClone: migration node {} already exists, " --- uncovered block 354-356 --- 352 | if (!responses.empty() && responses[0]->error == Coordination::Error::ZNODEEXISTS) 353 | { >> 354 | LOG_INFO(log, "startClone: migration node {} already exists, " >> 355 | "skipping creation", mig_path); >> 356 | return migration_id; 357 | } 358 | --- uncovered block 360-361 --- 358 | 359 | /// Assignment version changed — retry. >> 360 | if (rc == Coordination::Error::ZBADVERSION >> 361 | || (!responses.empty() && responses[1]->error == Coordination::Error::ZBADVERSION)) 362 | { 363 | LOG_DEBUG(log, "startClone: assignment CAS failed for partition {} " --- uncovered block 363-366 --- 361 | || (!responses.empty() && responses[1]->error == Coordination::Error::ZBADVERSION)) 362 | { >> 363 | LOG_DEBUG(log, "startClone: assignment CAS failed for partition {} " >> 364 | "(attempt {}/{}), retrying", >> 365 | partition_id, attempt + 1, SelectiveReplication::MAX_MIGRATION_CAS_RETRIES); >> 366 | continue; 367 | } 368 | --- uncovered block 369-369 --- 367 | } 368 | >> 369 | throw Coordination::Exception::fromPath(rc, mig_path); 370 | } 371 | --- uncovered block 373-376 --- 371 | 372 | if (!initialized) >> 373 | throw Exception(ErrorCodes::UNFINISHED, >> 374 | "startClone: failed to atomically create migration node and update assignment " >> 375 | "for partition {} after {} CAS retries", >> 376 | partition_id, SelectiveReplication::MAX_MIGRATION_CAS_RETRIES); 377 | 378 | /// 5. Write GET_PART entries to the shared replication log. --- uncovered block 414-415 --- 412 | if (!MigrationMetadata::read(zk, mig_path, meta)) 413 | { >> 414 | LOG_DEBUG(log, "Migration {} node not found, skipping", migration_id); >> 415 | return; 416 | } 417 | --- uncovered block 448-449 --- 446 | if (isCloneComplete(zk, migration_id)) 447 | { >> 448 | LOG_INFO(log, "Migration {} CLONE phase complete, advancing to SWITCH", migration_id); >> 449 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_SWITCH)) 450 | { 451 | LOG_WARNING(log, "Migration {} failed to update state to SWITCH (CAS conflict), retrying next cycle", migration_id); --- uncovered block 451-452 --- 449 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_SWITCH)) 450 | { >> 451 | LOG_WARNING(log, "Migration {} failed to update state to SWITCH (CAS conflict), retrying next cycle", migration_id); >> 452 | return; 453 | } 454 | meta.state = SelectiveReplication::MIGRATION_STATE_SWITCH; --- uncovered block 454-454 --- 452 | return; 453 | } >> 454 | meta.state = SelectiveReplication::MIGRATION_STATE_SWITCH; 455 | } 456 | else --- uncovered block 462-462 --- 460 | } 461 | >> 462 | if (meta.state == SelectiveReplication::MIGRATION_STATE_SWITCH) 463 | { 464 | /// CAS update assignment: remove source, strip :cloning from target. --- uncovered block 465-466 --- 463 | { 464 | /// CAS update assignment: remove source, strip :cloning from target. >> 465 | bool switch_ok = false; >> 466 | try 467 | { 468 | switch_ok = casUpdateAssignment(zk, meta.partition_id, --- uncovered block 468-469 --- 466 | try 467 | { >> 468 | switch_ok = casUpdateAssignment(zk, meta.partition_id, >> 469 | [&](Strings replicas) -> std::optional 470 | { 471 | Strings new_replicas; --- uncovered block 471-472 --- 469 | [&](Strings replicas) -> std::optional 470 | { >> 471 | Strings new_replicas; >> 472 | for (const auto & r : replicas) 473 | { 474 | String clean = KeeperReplicaAssignment::stripCloningSuffix(r); --- uncovered block 474-478 --- 472 | for (const auto & r : replicas) 473 | { >> 474 | String clean = KeeperReplicaAssignment::stripCloningSuffix(r); >> 475 | if (clean == meta.source_replica) >> 476 | continue; /// Remove source. >> 477 | if (clean == meta.target_replica) >> 478 | new_replicas.push_back(clean); /// Strip :cloning. 479 | else 480 | new_replicas.push_back(r); /// Keep as-is. --- uncovered block 480-480 --- 478 | new_replicas.push_back(clean); /// Strip :cloning. 479 | else >> 480 | new_replicas.push_back(r); /// Keep as-is. 481 | } 482 | return new_replicas; --- uncovered block 482-483 --- 480 | new_replicas.push_back(r); /// Keep as-is. 481 | } >> 482 | return new_replicas; >> 483 | }); 484 | } 485 | catch (const DB::Exception &) --- uncovered block 485-485 --- 483 | }); 484 | } >> 485 | catch (const DB::Exception &) 486 | { 487 | LOG_WARNING(log, "Migration {} SWITCH failed for partition {}, rolling back", --- uncovered block 487-490 --- 485 | catch (const DB::Exception &) 486 | { >> 487 | LOG_WARNING(log, "Migration {} SWITCH failed for partition {}, rolling back", >> 488 | migration_id, meta.partition_id); >> 489 | rollback(zk, migration_id); >> 490 | return; 491 | } 492 | --- uncovered block 493-493 --- 491 | } 492 | >> 493 | if (switch_ok) 494 | { 495 | LOG_INFO(log, "Migration {} SWITCH complete for partition {}", migration_id, meta.partition_id); --- uncovered block 495-496 --- 493 | if (switch_ok) 494 | { >> 495 | LOG_INFO(log, "Migration {} SWITCH complete for partition {}", migration_id, meta.partition_id); >> 496 | ProfileEvents::increment(ProfileEvents::SelectiveReplicationMigrationCompleted); 497 | } 498 | else --- uncovered block 500-503 --- 498 | else 499 | { >> 500 | LOG_WARNING(log, "Migration {} SWITCH failed: assignment disappeared for partition {}, rolling back", >> 501 | migration_id, meta.partition_id); >> 502 | rollback(zk, migration_id); >> 503 | return; 504 | } 505 | --- uncovered block 506-506 --- 504 | } 505 | >> 506 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_CLEANUP)) 507 | { 508 | LOG_WARNING(log, "Migration {} failed to transition to CLEANUP (CAS conflict), will retry", migration_id); --- uncovered block 508-509 --- 506 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_CLEANUP)) 507 | { >> 508 | LOG_WARNING(log, "Migration {} failed to transition to CLEANUP (CAS conflict), will retry", migration_id); >> 509 | return; 510 | } 511 | meta.state = SelectiveReplication::MIGRATION_STATE_CLEANUP; --- uncovered block 511-511 --- 509 | return; 510 | } >> 511 | meta.state = SelectiveReplication::MIGRATION_STATE_CLEANUP; 512 | } 513 | --- uncovered block 514-514 --- 512 | } 513 | >> 514 | if (meta.state == SelectiveReplication::MIGRATION_STATE_CLEANUP) 515 | { 516 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_DONE)) --- uncovered block 516-516 --- 514 | if (meta.state == SelectiveReplication::MIGRATION_STATE_CLEANUP) 515 | { >> 516 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_DONE)) 517 | { 518 | LOG_WARNING(log, "Migration {} failed to transition to DONE (CAS conflict), will retry", migration_id); --- uncovered block 518-519 --- 516 | if (!MigrationMetadata::updateState(zk, mig_path, SelectiveReplication::MIGRATION_STATE_DONE)) 517 | { >> 518 | LOG_WARNING(log, "Migration {} failed to transition to DONE (CAS conflict), will retry", migration_id); >> 519 | return; 520 | } 521 | /// Remove the clone_complete signal child node. --- uncovered block 522-523 --- 520 | } 521 | /// Remove the clone_complete signal child node. >> 522 | zk->tryRemove(mig_path + "/clone_complete"); >> 523 | LOG_INFO(log, "Migration {} completed (DONE)", migration_id); 524 | } 525 | } --- uncovered block 554-556 --- 552 | catch (const Coordination::Exception & e) 553 | { >> 554 | LOG_WARNING(log, "Rollback: failed to update assignment for partition {} ({}), " >> 555 | "migration node will be preserved for retry in next recovery cycle", >> 556 | meta.partition_id, e.message()); 557 | } 558 | } --- uncovered block 570-571 --- 568 | else 569 | { >> 570 | LOG_WARNING(log, "Migration {} marked FAILED but migration node preserved " >> 571 | "(assignment cleanup pending)", migration_id); 572 | } 573 | ProfileEvents::increment(ProfileEvents::SelectiveReplicationMigrationFailed); --- uncovered block 602-602 --- 600 | MigrationMetadata meta; 601 | if (!MigrationMetadata::read(zk, mig_path, meta)) >> 602 | continue; 603 | 604 | if (meta.state != SelectiveReplication::MIGRATION_STATE_CLONE || meta.target_replica != self_name) --- uncovered block 608-608 --- 606 | 607 | if (zk->exists(mig_path + "/clone_complete")) >> 608 | continue; 609 | 610 | if (meta.source_parts_snapshot.empty()) --- uncovered block 611-611 --- 609 | 610 | if (meta.source_parts_snapshot.empty()) >> 611 | continue; 612 | 613 | const Strings & snapshot_parts = meta.source_parts_snapshot; --- uncovered block 619-619 --- 617 | { 618 | if (zk->tryGetChildren(self_parts_path, our_parts) != Coordination::Error::ZOK) >> 619 | return; 620 | our_parts_loaded = true; 621 | } --- uncovered block 627-629 --- 625 | for (const auto & p : our_parts) 626 | { >> 627 | auto info = MergeTreePartInfo::tryParsePartName(p, storage.format_version); >> 628 | if (info && info->getPartitionId() == meta.partition_id) >> 629 | our_set.add(p); 630 | } 631 | --- uncovered block 645-649 --- 643 | if (all_covered) 644 | { >> 645 | auto code = zk->tryCreate(mig_path + "/clone_complete", "", zkutil::CreateMode::Persistent); >> 646 | if (code == Coordination::Error::ZOK) >> 647 | LOG_INFO(log, "Signaled clone_complete for migration {} (target: {})", mid, self_name); >> 648 | else if (code != Coordination::Error::ZNODEEXISTS) >> 649 | LOG_WARNING(log, "Failed to create clone_complete for migration {}: {}", mid, Coordination::errorMessage(code)); 650 | } 651 | } --- uncovered block 683-683 --- 681 | Strings replicas = KeeperReplicaAssignment::parseAssignment(assignment_data); 682 | if (std::find(replicas.begin(), replicas.end(), cloning_name) == replicas.end()) >> 683 | return false; 684 | 685 | /// Check 2: snapshot should be non-empty. === Lost Baseline Coverage: 58 lines === ================================================================================ src/Storages/MergeTree/MergeTreeData.cpp ================================================================================ --- lost coverage block 3263-3263 --- 3261 | catch (const fs::filesystem_error & e) 3262 | { >> 3263 | if (e.code() == std::errc::no_such_file_or_directory) 3264 | { 3265 | /// If the file is already deleted, do nothing. --- lost coverage block 5132-5132 --- 5130 | (*it_duplicate)->getNameWithState()); 5131 | >> 5132 | if (transaction.txn) 5133 | throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Part {} already exists", (*it_duplicate)->getNameWithState()); 5134 | --- lost coverage block 5135-5135 --- 5133 | throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Part {} already exists", (*it_duplicate)->getNameWithState()); 5134 | >> 5135 | throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", (*it_duplicate)->getNameWithState()); 5136 | } 5137 | } --- lost coverage block 7165-7166 --- 7163 | } 7164 | >> 7165 | tryLogException(error, log, >> 7166 | fmt::format("Failed to load part {} at try {} with a retryable error. Will retry in {} ms", part_name, try_no, backoff_ms)); 7167 | 7168 | std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); --- lost coverage block 7168-7169 --- 7166 | fmt::format("Failed to load part {} at try {} with a retryable error. Will retry in {} ms", part_name, try_no, backoff_ms)); 7167 | >> 7168 | std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); >> 7169 | backoff_ms = std::min(backoff_ms * 2, loading_parts_max_backoff_ms); 7170 | } 7171 | --- lost coverage block 9297-9297 --- 9295 | broken_part_callback(data_part->name); 9296 | else >> 9297 | LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState()); 9298 | } 9299 | ================================================================================ src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp ================================================================================ --- lost coverage block 549-550 --- 547 | for (const auto & actual_part : entry->replace_range_actual_new_part_names) 548 | { >> 549 | LOG_TEST(log, "Entry {} has actual new part name {}, removing it from mutations", entry->znode_name, actual_part); >> 550 | removeCoveredPartsFromMutations(actual_part, /*remove_part = */ false, /*remove_covered_parts = */ true); 551 | } 552 | --- lost coverage block 1507-1507 --- 1505 | { 1506 | if (!entry->currently_executing) >> 1507 | continue; 1508 | 1509 | const auto virtual_part_names = entry->getVirtualPartNames(format_version); --- lost coverage block 1619-1620 --- 1617 | /// 3. Then we pull log, trying to execute DROP_RANGE all_2_2_0 1618 | /// and reveal that it was incorrectly reordered with MERGE_PARTS all_1_3_1 (drop range intersects merged part). >> 1619 | reject_reason = fmt::format("Log entry for part {} or covering part is not pulled from log to queue yet.", part_name); >> 1620 | return false; 1621 | } 1622 | --- lost coverage block 2072-2072 --- 2070 | auto actual_part_info = MergeTreePartInfo::fromPartName(actual_part_name, queue.format_version); 2071 | for (const auto & other_part_name : entry.replace_range_actual_new_part_names) >> 2072 | if (!MergeTreePartInfo::fromPartName(other_part_name, queue.format_version).isDisjoint(actual_part_info)) 2073 | throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already has actual part {} non-disjoint with {}. This is a bug.", 2074 | entry.actual_new_part_name, other_part_name, actual_part_name); --- lost coverage block 2086-2086 --- 2084 | 2085 | if (entry.type == LogEntry::REPLACE_RANGE) >> 2086 | entry.replace_range_actual_new_part_names.insert(actual_part_name); 2087 | else 2088 | entry.actual_new_part_name = actual_part_name; --- lost coverage block 2132-2132 --- 2130 | 2131 | for (const auto & actual_part : entry->replace_range_actual_new_part_names) >> 2132 | erase_and_check(actual_part); 2133 | 2134 | entry->replace_range_actual_new_part_names.clear(); ================================================================================ src/Storages/StorageReplicatedMergeTree.cpp ================================================================================ --- lost coverage block 1297-1299 --- 1295 | 1296 | if (!zookeeper->tryGet(zookeeper_path + "/replicas", replicas_value, &replicas_stat)) >> 1297 | throw Exception(ErrorCodes::ALL_REPLICAS_LOST, >> 1298 | "Cannot create a replica of the table {}, because the last replica of the table was dropped right now", >> 1299 | zookeeper_path); 1300 | 1301 | /// It is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica. --- lost coverage block 2765-2765 --- 2763 | /* try_fetch_shared= */ true)) 2764 | { >> 2765 | return false; 2766 | } 2767 | } --- lost coverage block 3222-3222 --- 3220 | if (replica.empty()) 3221 | { >> 3222 | LOG_DEBUG(log, "Part {} is not found on remote replicas", part_desc->new_part_name); 3223 | 3224 | /// Fallback to covering part --- lost coverage block 3225-3225 --- 3223 | 3224 | /// Fallback to covering part >> 3225 | replica = findReplicaHavingCoveringPartImplLowLevel(&entry, part_desc->new_part_name, found_part_name, true); 3226 | 3227 | if (replica.empty()) --- lost coverage block 3227-3227 --- 3225 | replica = findReplicaHavingCoveringPartImplLowLevel(&entry, part_desc->new_part_name, found_part_name, true); 3226 | >> 3227 | if (replica.empty()) 3228 | { 3229 | /// It is not fail, since adjacent parts could cover current part --- lost coverage block 3230-3231 --- 3228 | { 3229 | /// It is not fail, since adjacent parts could cover current part >> 3230 | LOG_DEBUG(log, "Parts covering {} are not found on remote replicas", part_desc->new_part_name); >> 3231 | continue; 3232 | } 3233 | --- lost coverage block 3234-3234 --- 3232 | } 3233 | >> 3234 | LOG_TRACE(log, "Found part {} covering {} on replica {}", found_part_name, part_desc->new_part_name, replica); 3235 | } 3236 | else --- lost coverage block 3437-3437 --- 3435 | catch (...) 3436 | { >> 3437 | PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(res_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); 3438 | 3439 | for (const auto & res_part : res_parts) --- lost coverage block 3439-3440 --- 3437 | PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(res_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); 3438 | >> 3439 | for (const auto & res_part : res_parts) >> 3440 | unlockSharedData(*res_part); 3441 | 3442 | throw; --- lost coverage block 3442-3442 --- 3440 | unlockSharedData(*res_part); 3441 | >> 3442 | throw; 3443 | } 3444 | --- lost coverage block 4404-4405 --- 4402 | /// Note that we need to check shutdown_prepared_called, not shutdown_called, since the table will be marked as readonly 4403 | /// after calling StorageReplicatedMergeTree::flushAndPrepareForShutdown(). >> 4404 | if (shutdown_prepared_called) >> 4405 | return AttemptStatus::CannotSelect; 4406 | throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {}), cannot assign new merges", replica_path); 4407 | } --- lost coverage block 4827-4827 --- 4825 | else 4826 | { >> 4827 | zkutil::KeeperMultiException::check(code, ops, responses); 4828 | } 4829 | --- lost coverage block 5427-5427 --- 5425 | break; 5426 | } >> 5427 | if (code == Coordination::Error::ZNONODE) 5428 | { 5429 | /// The quorum has already been achieved. --- lost coverage block 5430-5430 --- 5428 | { 5429 | /// The quorum has already been achieved. >> 5430 | break; 5431 | } 5432 | if (code == Coordination::Error::ZBADVERSION) --- lost coverage block 5577-5577 --- 5575 | if (auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting})) 5576 | { >> 5577 | LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name); 5578 | /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. 5579 | cleanup_thread.wakeup(); --- lost coverage block 5579-5580 --- 5577 | LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name); 5578 | /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. >> 5579 | cleanup_thread.wakeup(); >> 5580 | return false; 5581 | } 5582 | } --- lost coverage block 7284-7285 --- 7282 | 7283 | assert(try_no == max_retries); >> 7284 | static constexpr const char * message_fmt = "Can't create merge queue node in ZooKeeper, because log was updated in every of {} tries"; >> 7285 | return handle_noop(message_fmt, try_no); 7286 | }; 7287 | --- lost coverage block 7308-7308 --- 7306 | assigned = try_assign_merge(partition_id); 7307 | if (!assigned) >> 7308 | break; 7309 | } 7310 | } --- lost coverage block 9606-9606 --- 9604 | 9605 | if (result.error == Coordination::Error::ZNONODE) >> 9606 | continue; 9607 | 9608 | ReadBufferFromString buf(result.data); --- lost coverage block 10616-10618 --- 10614 | if (partial_shutdown_called) 10615 | { >> 10616 | was_interrupted = true; >> 10617 | target_entry_event.set(); >> 10618 | return; 10619 | } 10620 | --- lost coverage block 10642-10642 --- 10640 | 10641 | if (was_interrupted) >> 10642 | throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table"); 10643 | 10644 | return true; --- lost coverage block 10754-10758 --- 10752 | if (rc == Coordination::Error::ZNONODE) 10753 | { >> 10754 | LOG_TRACE( >> 10755 | log, >> 10756 | "Other replica already removing same part {} or part deduplication node was removed by background thread. Retry.", >> 10757 | part_name); >> 10758 | continue; 10759 | } 10760 | zkutil::KeeperMultiException::check(rc, ops, responses); --- lost coverage block 12083-12085 --- 12081 | else 12082 | { >> 12083 | throw Exception(ErrorCodes::DUPLICATE_DATA_PART, >> 12084 | "Part {} already exists on replica {} on path {}", >> 12085 | lost_part_name, replica, current_part_path); 12086 | } 12087 | } --- lost coverage block 12095-12095 --- 12093 | new_data_part, /*hardlinked_files*/ {}, /*replace_zero_copy_lock*/ true, ops, num_check_ops_unused); 12094 | if (part_found) >> 12095 | throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Found part on another replica, probably it was already replaced"); 12096 | 12097 | /// Increment lost_part_count --- lost coverage block 12120-12120 --- 12118 | break; 12119 | } >> 12120 | if (code == Coordination::Error::ZBADVERSION) 12121 | { 12122 | LOG_INFO(log, "Looks like log was updated or new replica appeared while creating new empty part, will retry"); --- lost coverage block 12126-12126 --- 12124 | else 12125 | { >> 12126 | zkutil::KeeperMultiException::check(code, ops, responses); 12127 | } 12128 | } --- lost coverage block 12495-12495 --- 12493 | getStatus(status, /* with_zk_fields = */ false); 12494 | if (status.queue.inserts_in_queue) >> 12495 | empty = false; 12496 | } 12497 | auto backup = restorer.getBackup(); WARNING: Failed to get start time for [Print Uncovered Code] - start time and duration won't be set --- Coverage counts --- Lines : baseline 745,882/887,447 -> current 748,487/890,076 (delta +2,605 / +2,629) Functions : baseline 803,454/883,823 -> current 803,653/884,119 (delta +199 / +296) Branches : baseline 241,833/316,048 -> current 242,776/317,134 (delta +943 / +1,086)