CodeCommitsIssuesPull requestsActionsInsightsSecurity
4043be31211a958d6697a008f55f4280c538b18c

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

programs/copier/ClusterCopier.cpp

1895lines · modecode

1#include "ClusterCopier.h"
2
3#include "Internals.h"
4
5#include <Common/ZooKeeper/ZooKeeper.h>
6#include <Common/ZooKeeper/KeeperException.h>
7#include <Common/setThreadName.h>
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15 extern const int NOT_IMPLEMENTED;
16 extern const int LOGICAL_ERROR;
17 extern const int UNFINISHED;
18 extern const int BAD_ARGUMENTS;
19}
20
21
22void ClusterCopier::init()
23{
24 auto zookeeper = context.getZooKeeper();
25
26 task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
27 {
28 if (response.error != Coordination::Error::ZOK)
29 return;
30 UInt64 version = ++task_description_version;
31 LOG_DEBUG(log, "Task description should be updated, local version {}", version);
32 };
33
34 task_description_path = task_zookeeper_path + "/description";
35 task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, working_database_name);
36
37 reloadTaskDescription();
38 task_cluster_initial_config = task_cluster_current_config;
39
40 task_cluster->loadTasks(*task_cluster_initial_config);
41 context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
42
43 /// Set up shards and their priority
44 task_cluster->random_engine.seed(task_cluster->random_device());
45 for (auto & task_table : task_cluster->table_tasks)
46 {
47 task_table.cluster_pull = context.getCluster(task_table.cluster_pull_name);
48 task_table.cluster_push = context.getCluster(task_table.cluster_push_name);
49 task_table.initShards(task_cluster->random_engine);
50 }
51
52 LOG_DEBUG(log, "Will process {} table tasks", task_cluster->table_tasks.size());
53
54 /// Do not initialize tables, will make deferred initialization in process()
55
56 zookeeper->createAncestors(getWorkersPathVersion() + "/");
57 zookeeper->createAncestors(getWorkersPath() + "/");
58}
59
60template <typename T>
61decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
62{
63 std::exception_ptr exception;
64
65 for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
66 {
67 try
68 {
69 return func();
70 }
71 catch (...)
72 {
73 exception = std::current_exception();
74 if (try_number < max_tries)
75 {
76 tryLogCurrentException(log, "Will retry");
77 std::this_thread::sleep_for(default_sleep_time);
78 }
79 }
80 }
81
82 std::rethrow_exception(exception);
83}
84
85
86void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
87{
88 TaskTable & task_table = task_shard->task_table;
89
90 LOG_INFO(log, "Discover partitions of shard {}", task_shard->getDescription());
91
92 auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
93 auto existing_partitions_names = retry(get_partitions, 60);
94 Strings filtered_partitions_names;
95 Strings missing_partitions;
96
97 /// Check that user specified correct partition names
98 auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted)
99 {
100 MutableColumnPtr column_dummy = type->createColumn();
101 ReadBufferFromString rb(partition_text_quoted);
102
103 try
104 {
105 type->deserializeAsTextQuoted(*column_dummy, rb, FormatSettings());
106 }
107 catch (Exception & e)
108 {
109 throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
110 }
111 };
112
113 if (task_table.has_enabled_partitions)
114 {
115 /// Process partition in order specified by <enabled_partitions/>
116 for (const String & partition_name : task_table.enabled_partitions)
117 {
118 /// Check that user specified correct partition names
119 check_partition_format(task_shard->partition_key_column.type, partition_name);
120
121 auto it = existing_partitions_names.find(partition_name);
122
123 /// Do not process partition if it is not in enabled_partitions list
124 if (it == existing_partitions_names.end())
125 {
126 missing_partitions.emplace_back(partition_name);
127 continue;
128 }
129
130 filtered_partitions_names.emplace_back(*it);
131 }
132
133 for (const String & partition_name : existing_partitions_names)
134 {
135 if (!task_table.enabled_partitions_set.count(partition_name))
136 {
137 LOG_DEBUG(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
138 }
139 }
140 }
141 else
142 {
143 for (const String & partition_name : existing_partitions_names)
144 filtered_partitions_names.emplace_back(partition_name);
145 }
146
147 for (const String & partition_name : filtered_partitions_names)
148 {
149 const size_t number_of_splits = task_table.number_of_splits;
150 task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name, number_of_splits));
151 task_shard->checked_partitions.emplace(partition_name, true);
152
153 auto shard_partition_it = task_shard->partition_tasks.find(partition_name);
154 PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;
155
156 for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
157 {
158 bool res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *task_shard, partition_name, piece_number);
159 shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
160 }
161 }
162
163 if (!missing_partitions.empty())
164 {
165 std::stringstream ss;
166 for (const String & missing_partition : missing_partitions)
167 ss << " " << missing_partition;
168
169 LOG_WARNING(log, "There are no {} partitions from enabled_partitions in shard {} :{}", missing_partitions.size(), task_shard->getDescription(), ss.str());
170 }
171
172 LOG_DEBUG(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
173}
174
175void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads)
176{
177 /// Fetch partitions list from a shard
178 {
179 ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
180
181 for (const TaskShardPtr & task_shard : task_table.all_shards)
182 thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]()
183 {
184 setThreadName("DiscoverPartns");
185 discoverShardPartitions(timeouts, task_shard);
186 });
187
188 LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
189 thread_pool.wait();
190 }
191}
192
193void ClusterCopier::uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
194{
195 auto local_task_description_path = task_path + "/description";
196
197 String task_config_str;
198 {
199 ReadBufferFromFile in(task_file);
200 readStringUntilEOF(task_config_str, in);
201 }
202 if (task_config_str.empty())
203 return;
204
205 auto zookeeper = context.getZooKeeper();
206
207 zookeeper->createAncestors(local_task_description_path);
208 auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
209 if (code != Coordination::Error::ZOK && force)
210 zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
211
212 LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})",
213 ((code != Coordination::Error::ZOK && !force) ? "not " : ""), local_task_description_path, code, Coordination::errorMessage(code));
214}
215
216void ClusterCopier::reloadTaskDescription()
217{
218 auto zookeeper = context.getZooKeeper();
219 task_description_watch_zookeeper = zookeeper;
220
221 String task_config_str;
222 Coordination::Stat stat{};
223 Coordination::Error code;
224
225 zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
226 if (code != Coordination::Error::ZOK)
227 throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
228
229 LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid);
230 auto config = getConfigurationFromXMLString(task_config_str);
231
232 /// Setup settings
233 task_cluster->reloadSettings(*config);
234 context.setSettings(task_cluster->settings_common);
235
236 task_cluster_current_config = config;
237 task_description_current_stat = stat;
238}
239
240void ClusterCopier::updateConfigIfNeeded()
241{
242 UInt64 version_to_update = task_description_version;
243 bool is_outdated_version = task_description_current_version != version_to_update;
244 bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
245
246 if (!is_outdated_version && !is_expired_session)
247 return;
248
249 LOG_DEBUG(log, "Updating task description");
250 reloadTaskDescription();
251
252 task_description_current_version = version_to_update;
253}
254
255void ClusterCopier::process(const ConnectionTimeouts & timeouts)
256{
257 for (TaskTable & task_table : task_cluster->table_tasks)
258 {
259 LOG_INFO(log, "Process table task {} with {} shards, {} of them are local ones", task_table.table_id, task_table.all_shards.size(), task_table.local_shards.size());
260
261 if (task_table.all_shards.empty())
262 continue;
263
264 /// Discover partitions of each shard and total set of partitions
265 if (!task_table.has_enabled_partitions)
266 {
267 /// If there are no specified enabled_partitions, we must discover them manually
268 discoverTablePartitions(timeouts, task_table);
269
270 /// After partitions of each shard are initialized, initialize cluster partitions
271 for (const TaskShardPtr & task_shard : task_table.all_shards)
272 {
273 for (const auto & partition_elem : task_shard->partition_tasks)
274 {
275 const String & partition_name = partition_elem.first;
276 task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
277 }
278 }
279
280 for (auto & partition_elem : task_table.cluster_partitions)
281 {
282 const String & partition_name = partition_elem.first;
283
284 for (const TaskShardPtr & task_shard : task_table.all_shards)
285 task_shard->checked_partitions.emplace(partition_name);
286
287 task_table.ordered_partition_names.emplace_back(partition_name);
288 }
289 }
290 else
291 {
292 /// If enabled_partitions are specified, assume that each shard has all partitions
293 /// We will refine partition set of each shard in future
294
295 for (const String & partition_name : task_table.enabled_partitions)
296 {
297 task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
298 task_table.ordered_partition_names.emplace_back(partition_name);
299 }
300 }
301
302 task_table.watch.restart();
303
304 /// Retry table processing
305 bool table_is_done = false;
306 for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
307 {
308 if (tryProcessTable(timeouts, task_table))
309 {
310 table_is_done = true;
311 break;
312 }
313 }
314
315 /// Delete helping tables in both cases (whole table is done or not)
316 dropHelpingTables(task_table);
317
318 if (!table_is_done)
319 {
320 throw Exception("Too many tries to process table " + task_table.table_id + ". Abort remaining execution",
321 ErrorCodes::UNFINISHED);
322 }
323 }
324}
325
326/// Protected section
327
328
329/*
330 * Creates task worker node and checks maximum number of workers not to exceed the limit.
331 * To achieve this we have to check version of workers_version_path node and create current_worker_path
332 * node atomically.
333 * */
334
335zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNeed(
336 const zkutil::ZooKeeperPtr & zookeeper,
337 const String & description,
338 bool unprioritized)
339{
340 std::chrono::milliseconds current_sleep_time = default_sleep_time;
341 static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
342
343 if (unprioritized)
344 std::this_thread::sleep_for(current_sleep_time);
345
346 String workers_version_path = getWorkersPathVersion();
347 String workers_path = getWorkersPath();
348 String current_worker_path = getCurrentWorkerNodePath();
349
350 UInt64 num_bad_version_errors = 0;
351
352 while (true)
353 {
354 updateConfigIfNeeded();
355
356 Coordination::Stat stat;
357 zookeeper->get(workers_version_path, &stat);
358 auto version = stat.version;
359 zookeeper->get(workers_path, &stat);
360
361 if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
362 {
363 LOG_DEBUG(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
364
365 if (unprioritized)
366 current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
367
368 std::this_thread::sleep_for(current_sleep_time);
369 num_bad_version_errors = 0;
370 }
371 else
372 {
373 Coordination::Requests ops;
374 ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
375 ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
376 Coordination::Responses responses;
377 auto code = zookeeper->tryMulti(ops, responses);
378
379 if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
380 return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
381
382 if (code == Coordination::Error::ZBADVERSION)
383 {
384 ++num_bad_version_errors;
385
386 /// Try to make fast retries
387 if (num_bad_version_errors > 3)
388 {
389 LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
390 std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution<int>(1, 1000)(task_cluster->random_engine));
391 std::this_thread::sleep_for(random_sleep_time);
392 num_bad_version_errors = 0;
393 }
394 }
395 else
396 throw Coordination::Exception(code);
397 }
398 }
399}
400
401
402bool ClusterCopier::checkPartitionPieceIsClean(
403 const zkutil::ZooKeeperPtr & zookeeper,
404 const CleanStateClock & clean_state_clock,
405 const String & task_status_path)
406{
407 LogicalClock task_start_clock;
408
409 Coordination::Stat stat{};
410 if (zookeeper->exists(task_status_path, &stat))
411 task_start_clock = LogicalClock(stat.mzxid);
412
413 return clean_state_clock.is_clean() && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock);
414}
415
416
417bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
418{
419 bool answer = true;
420 for (size_t piece_number = 0; piece_number < task_table.number_of_splits; ++piece_number)
421 {
422 bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
423 if (!piece_is_done)
424 LOG_DEBUG(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
425 answer &= piece_is_done;
426 }
427
428 return answer;
429}
430
431
432/* The same as function above
433 * Assume that we don't know on which shards do we have partition certain piece.
434 * We'll check them all (I mean shards that contain the whole partition)
435 * And shards that don't have certain piece MUST mark that piece is_done true.
436 * */
437bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
438 size_t piece_number, const TasksShard & shards_with_partition)
439{
440 LOG_DEBUG(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
441
442 auto zookeeper = context.getZooKeeper();
443
444 /// Collect all shards that contain partition piece number piece_number.
445 Strings piece_status_paths;
446 for (const auto & shard : shards_with_partition)
447 {
448 ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
449 ShardPartitionPiece & shard_partition_piece = task_shard_partition.pieces[piece_number];
450 piece_status_paths.emplace_back(shard_partition_piece.getShardStatusPath());
451 }
452
453 std::vector<int64_t> zxid1, zxid2;
454
455 try
456 {
457 std::vector<zkutil::ZooKeeper::FutureGet> get_futures;
458 for (const String & path : piece_status_paths)
459 get_futures.emplace_back(zookeeper->asyncGet(path));
460
461 // Check that state is Finished and remember zxid
462 for (auto & future : get_futures)
463 {
464 auto res = future.get();
465
466 TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data);
467 if (status.state != TaskState::Finished)
468 {
469 LOG_INFO(log, "The task {} is being rewritten by {}. Partition piece will be rechecked", res.data, status.owner);
470 return false;
471 }
472
473 zxid1.push_back(res.stat.pzxid);
474 }
475
476 const String piece_is_dirty_flag_path = task_table.getCertainPartitionPieceIsDirtyPath(partition_name, piece_number);
477 const String piece_is_dirty_cleaned_path = task_table.getCertainPartitionPieceIsCleanedPath(partition_name, piece_number);
478 const String piece_task_status_path = task_table.getCertainPartitionPieceTaskStatusPath(partition_name, piece_number);
479
480 CleanStateClock clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
481
482 const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_task_status_path);
483
484
485 if (!is_clean)
486 {
487 LOG_INFO(log, "Partition {} become dirty", partition_name);
488 return false;
489 }
490
491 get_futures.clear();
492 for (const String & path : piece_status_paths)
493 get_futures.emplace_back(zookeeper->asyncGet(path));
494
495 // Remember zxid of states again
496 for (auto & future : get_futures)
497 {
498 auto res = future.get();
499 zxid2.push_back(res.stat.pzxid);
500 }
501 }
502 catch (const Coordination::Exception & e)
503 {
504 LOG_INFO(log, "A ZooKeeper error occurred while checking partition {} piece number {}. Will recheck the partition. Error: {}", partition_name, toString(piece_number), e.displayText());
505 return false;
506 }
507
508 // If all task is finished and zxid is not changed then partition could not become dirty again
509 for (UInt64 shard_num = 0; shard_num < piece_status_paths.size(); ++shard_num)
510 {
511 if (zxid1[shard_num] != zxid2[shard_num])
512 {
513 LOG_INFO(log, "The task {} is being modified now. Partition piece will be rechecked", piece_status_paths[shard_num]);
514 return false;
515 }
516 }
517
518 LOG_INFO(log, "Partition {} piece number {} is copied successfully", partition_name, toString(piece_number));
519 return true;
520}
521
522
523TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name)
524{
525 bool inject_fault = false;
526 if (move_fault_probability > 0)
527 {
528 double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
529 inject_fault = value < move_fault_probability;
530 }
531
532 LOG_DEBUG(log, "Try to move {} to destination table", partition_name);
533
534 auto zookeeper = context.getZooKeeper();
535
536 const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name);
537 const auto current_partition_attach_is_done = task_table.getPartitionAttachIsDonePath(partition_name);
538
539 /// Create ephemeral node to mark that we are active and process the partition
540 zookeeper->createAncestors(current_partition_attach_is_active);
541 zkutil::EphemeralNodeHolderPtr partition_attach_node_holder;
542 try
543 {
544 partition_attach_node_holder = zkutil::EphemeralNodeHolder::create(current_partition_attach_is_active, *zookeeper, host_id);
545 }
546 catch (const Coordination::Exception & e)
547 {
548 if (e.code == Coordination::Error::ZNODEEXISTS)
549 {
550 LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
551 return TaskStatus::Active;
552 }
553
554 throw;
555 }
556
557
558 /// Exit if task has been already processed;
559 /// create blocking node to signal cleaning up if it is abandoned
560 {
561 String status_data;
562 if (zookeeper->tryGet(current_partition_attach_is_done, status_data))
563 {
564 TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
565 if (status.state == TaskState::Finished)
566 {
567 LOG_DEBUG(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
568 return TaskStatus::Finished;
569 }
570
571 /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
572 /// Initialize DROP PARTITION
573 LOG_DEBUG(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
574
575 /// Remove is_done marker.
576 zookeeper->remove(current_partition_attach_is_done);
577 }
578 }
579
580
581 /// Try start processing, create node about it
582 {
583 String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
584 zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
585 }
586
587 /// Move partition to original destination table.
588 for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
589 {
590 LOG_DEBUG(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
591
592 ASTPtr query_alter_ast;
593 String query_alter_ast_string;
594
595 DatabaseAndTableName original_table = task_table.table_push;
596 DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first,
597 original_table.second + "_piece_" +
598 toString(current_piece_number));
599
600 Settings settings_push = task_cluster->settings_push;
601
602 /// It is important, ALTER ATTACH PARTITION must be done synchronously
603 /// And we will execute this ALTER query on each replica of a shard.
604 /// It is correct, because this query is idempotent.
605 settings_push.replication_alter_partitions_sync = 2;
606
607 query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
608 " ATTACH PARTITION " + partition_name +
609 " FROM " + getQuotedTable(helping_table);
610
611 LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
612
613 try
614 {
615 size_t num_nodes = executeQueryOnCluster(
616 task_table.cluster_push,
617 query_alter_ast_string,
618 settings_push,
619 PoolMode::GET_MANY,
620 ClusterExecutionMode::ON_EACH_NODE);
621
622 LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
623 }
624 catch (...)
625 {
626 LOG_DEBUG(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
627 throw;
628 }
629
630 if (inject_fault)
631 throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
632
633 try
634 {
635 String query_deduplicate_ast_string;
636 if (!task_table.isReplicatedTable())
637 {
638 query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
639 " PARTITION " + partition_name + " DEDUPLICATE;";
640
641 LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
642
643 UInt64 num_nodes = executeQueryOnCluster(
644 task_table.cluster_push,
645 query_deduplicate_ast_string,
646 task_cluster->settings_push,
647 PoolMode::GET_MANY);
648
649 LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
650 }
651 }
652 catch (...)
653 {
654 LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
655 throw;
656 }
657 }
658
659 /// Create node to signal that we finished moving
660 {
661 String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
662 zookeeper->set(current_partition_attach_is_done, state_finished, 0);
663 }
664
665 return TaskStatus::Finished;
666}
667
668/// Removes MATERIALIZED and ALIAS columns from create table query
669ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
670{
671 const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
672 auto new_columns = std::make_shared<ASTExpressionList>();
673
674 for (const ASTPtr & column_ast : column_asts)
675 {
676 const auto & column = column_ast->as<ASTColumnDeclaration &>();
677
678 if (!column.default_specifier.empty())
679 {
680 ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
681 if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
682 continue;
683 }
684
685 new_columns->children.emplace_back(column_ast->clone());
686 }
687
688 ASTPtr new_query_ast = query_ast->clone();
689 auto & new_query = new_query_ast->as<ASTCreateQuery &>();
690
691 auto new_columns_list = std::make_shared<ASTColumns>();
692 new_columns_list->set(new_columns_list->columns, new_columns);
693 if (const auto * indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
694 new_columns_list->set(new_columns_list->indices, indices->clone());
695
696 new_query.replace(new_query.columns_list, new_columns_list);
697
698 return new_query_ast;
699}
700
701/// Replaces ENGINE and table name in a create query
702std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast,
703 const DatabaseAndTableName & new_table,
704 const ASTPtr & new_storage_ast)
705{
706 const auto & create = create_query_ast->as<ASTCreateQuery &>();
707 auto res = std::make_shared<ASTCreateQuery>(create);
708
709 if (create.storage == nullptr || new_storage_ast == nullptr)
710 throw Exception("Storage is not specified", ErrorCodes::LOGICAL_ERROR);
711
712 res->database = new_table.first;
713 res->table = new_table.second;
714
715 res->children.clear();
716 res->set(res->columns_list, create.columns_list->clone());
717 res->set(res->storage, new_storage_ast->clone());
718
719 return res;
720}
721
722
723bool ClusterCopier::tryDropPartitionPiece(
724 ShardPartition & task_partition,
725 const size_t current_piece_number,
726 const zkutil::ZooKeeperPtr & zookeeper,
727 const CleanStateClock & clean_state_clock)
728{
729 if (is_safe_mode)
730 throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
731
732 TaskTable & task_table = task_partition.task_shard.task_table;
733 ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];
734
735 const String current_shards_path = partition_piece.getPartitionPieceShardsPath();
736 const String current_partition_active_workers_dir = partition_piece.getPartitionPieceActiveWorkersPath();
737 const String is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath();
738 const String dirty_cleaner_path = partition_piece.getPartitionPieceCleanerPath();
739 const String is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath();
740
741 zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
742 try
743 {
744 cleaner_holder = zkutil::EphemeralNodeHolder::create(dirty_cleaner_path, *zookeeper, host_id);
745 }
746 catch (const Coordination::Exception & e)
747 {
748 if (e.code == Coordination::Error::ZNODEEXISTS)
749 {
750 LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
751 std::this_thread::sleep_for(default_sleep_time);
752 return false;
753 }
754
755 throw;
756 }
757
758 Coordination::Stat stat{};
759 if (zookeeper->exists(current_partition_active_workers_dir, &stat))
760 {
761 if (stat.numChildren != 0)
762 {
763 LOG_DEBUG(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
764 std::this_thread::sleep_for(default_sleep_time);
765 return false;
766 }
767 else
768 {
769 zookeeper->remove(current_partition_active_workers_dir);
770 }
771 }
772
773 {
774 zkutil::EphemeralNodeHolder::Ptr active_workers_lock;
775 try
776 {
777 active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id);
778 }
779 catch (const Coordination::Exception & e)
780 {
781 if (e.code == Coordination::Error::ZNODEEXISTS)
782 {
783 LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
784 return false;
785 }
786
787 throw;
788 }
789
790 // Lock the dirty flag
791 zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value());
792 zookeeper->tryRemove(partition_piece.getPartitionPieceCleanStartPath());
793 CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirty_cleaned_path);
794
795 /// Remove all status nodes
796 {
797 Strings children;
798 if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::Error::ZOK)
799 for (const auto & child : children)
800 {
801 zookeeper->removeRecursive(current_shards_path + "/" + child);
802 }
803 }
804
805
806 DatabaseAndTableName original_table = task_table.table_push;
807 DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
808
809 String query = "ALTER TABLE " + getQuotedTable(helping_table);
810 query += " DROP PARTITION " + task_partition.name + "";
811
812 /// TODO: use this statement after servers will be updated up to 1.1.54310
813 // query += " DROP PARTITION ID '" + task_partition.name + "'";
814
815 ClusterPtr & cluster_push = task_table.cluster_push;
816 Settings settings_push = task_cluster->settings_push;
817
818 /// It is important, DROP PARTITION must be done synchronously
819 settings_push.replication_alter_partitions_sync = 2;
820
821 LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
822 /// We have to drop partition_piece on each replica
823 size_t num_shards = executeQueryOnCluster(
824 cluster_push, query,
825 settings_push,
826 PoolMode::GET_MANY,
827 ClusterExecutionMode::ON_EACH_NODE);
828
829 LOG_INFO(log, "DROP PARTITION was successfully executed on {} nodes of a cluster.", num_shards);
830
831 /// Update the locking node
832 if (!my_clock.is_stale())
833 {
834 zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value());
835 if (my_clock.clean_state_version)
836 zookeeper->set(is_dirty_cleaned_path, host_id, my_clock.clean_state_version.value());
837 else
838 zookeeper->create(is_dirty_cleaned_path, host_id, zkutil::CreateMode::Persistent);
839 }
840 else
841 {
842 LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
843 /// clean state is stale
844 return false;
845 }
846
847 LOG_INFO(log, "Partition {} piece {} was dropped on cluster {}", task_partition.name, toString(current_piece_number), task_table.cluster_push_name);
848 if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::Error::ZNODEEXISTS)
849 zookeeper->set(current_shards_path, host_id);
850 }
851
852 LOG_INFO(log, "Partition {} piece {} is safe for work now.", task_partition.name, toString(current_piece_number));
853 return true;
854}
855
856bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
857{
858 /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
859 bool previous_shard_is_instantly_finished = false;
860
861 /// Process each partition that is present in cluster
862 for (const String & partition_name : task_table.ordered_partition_names)
863 {
864 if (!task_table.cluster_partitions.count(partition_name))
865 throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
866
867 ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
868
869 Stopwatch watch;
870 /// We will check all the shards of the table and check if they contain current partition.
871 TasksShard expected_shards;
872 UInt64 num_failed_shards = 0;
873
874 ++cluster_partition.total_tries;
875
876 LOG_DEBUG(log, "Processing partition {} for the whole cluster", partition_name);
877
878 /// Process each source shard having current partition and copy current partition
879 /// NOTE: shards are sorted by "distance" to current host
880 bool has_shard_to_process = false;
881 for (const TaskShardPtr & shard : task_table.all_shards)
882 {
883 /// Does shard have a node with current partition?
884 if (shard->partition_tasks.count(partition_name) == 0)
885 {
886 /// If not, did we check existence of that partition previously?
887 if (shard->checked_partitions.count(partition_name) == 0)
888 {
889 auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
890 bool has_partition = retry(check_shard_has_partition);
891
892 shard->checked_partitions.emplace(partition_name);
893
894 if (has_partition)
895 {
896 const size_t number_of_splits = task_table.number_of_splits;
897 shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name, number_of_splits));
898 LOG_DEBUG(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
899 /// To save references in the future.
900 auto shard_partition_it = shard->partition_tasks.find(partition_name);
901 PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;
902
903 for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
904 {
905 auto res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *shard, partition_name, piece_number);
906 shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
907 }
908 }
909 else
910 {
911 LOG_DEBUG(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
912 continue;
913 }
914 }
915 else
916 {
917 /// We have already checked that partition, but did not discover it
918 previous_shard_is_instantly_finished = true;
919 continue;
920 }
921 }
922
923 auto it_shard_partition = shard->partition_tasks.find(partition_name);
924 /// Previously when we discovered that shard does not contain current partition, we skipped it.
925 /// At this moment partition have to be present.
926 if (it_shard_partition == shard->partition_tasks.end())
927 throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
928 auto & partition = it_shard_partition->second;
929
930 expected_shards.emplace_back(shard);
931
932 /// Do not sleep if there is a sequence of already processed shards to increase startup
933 bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
934 TaskStatus task_status = TaskStatus::Error;
935 bool was_error = false;
936 has_shard_to_process = true;
937 for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
938 {
939 task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
940
941 /// Exit if success
942 if (task_status == TaskStatus::Finished)
943 break;
944
945 was_error = true;
946
947 /// Skip if the task is being processed by someone
948 if (task_status == TaskStatus::Active)
949 break;
950
951 /// Repeat on errors
952 std::this_thread::sleep_for(default_sleep_time);
953 }
954
955 if (task_status == TaskStatus::Error)
956 ++num_failed_shards;
957
958 previous_shard_is_instantly_finished = !was_error;
959 }
960
961 cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
962
963 /// Check that whole cluster partition is done
964 /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
965 bool partition_copying_is_done = num_failed_shards == 0;
966 try
967 {
968 partition_copying_is_done =
969 !has_shard_to_process
970 || (partition_copying_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards));
971 }
972 catch (...)
973 {
974 tryLogCurrentException(log);
975 partition_copying_is_done = false;
976 }
977
978
979 bool partition_moving_is_done = false;
980 /// Try to move only if all pieces were copied.
981 if (partition_copying_is_done)
982 {
983 for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num)
984 {
985 try
986 {
987 auto res = tryMoveAllPiecesToDestinationTable(task_table, partition_name);
988 /// Exit and mark current task is done.
989 if (res == TaskStatus::Finished)
990 {
991 partition_moving_is_done = true;
992 break;
993 }
994
995 /// Exit if this task is active.
996 if (res == TaskStatus::Active)
997 break;
998
999 /// Repeat on errors.
1000 std::this_thread::sleep_for(default_sleep_time);
1001 }
1002 catch (...)
1003 {
1004 tryLogCurrentException(log, "Some error occurred while moving pieces to destination table for partition " + partition_name);
1005 }
1006 }
1007 }
1008
1009 if (partition_copying_is_done && partition_moving_is_done)
1010 {
1011 task_table.finished_cluster_partitions.emplace(partition_name);
1012
1013 task_table.bytes_copied += cluster_partition.bytes_copied;
1014 task_table.rows_copied += cluster_partition.rows_copied;
1015 double elapsed = cluster_partition.elapsed_time_seconds;
1016
1017 LOG_INFO(log, "It took {} seconds to copy partition {}: {} uncompressed bytes, {} rows and {} source blocks are copied",
1018 elapsed, partition_name,
1019 formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied),
1020 formatReadableQuantity(cluster_partition.rows_copied),
1021 cluster_partition.blocks_copied);
1022
1023 if (cluster_partition.rows_copied)
1024 {
1025 LOG_INFO(log, "Average partition speed: {} per second.", formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed));
1026 }
1027
1028 if (task_table.rows_copied)
1029 {
1030 LOG_INFO(log, "Average table {} speed: {} per second.", task_table.table_id, formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed));
1031 }
1032 }
1033 }
1034
1035 UInt64 required_partitions = task_table.cluster_partitions.size();
1036 UInt64 finished_partitions = task_table.finished_cluster_partitions.size();
1037 bool table_is_done = finished_partitions >= required_partitions;
1038
1039 if (!table_is_done)
1040 {
1041 LOG_INFO(log, "Table {} is not processed yet.Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
1042 }
1043
1044 return table_is_done;
1045}
1046
1047/// Job for copying partition from particular shard.
1048TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
1049{
1050 TaskStatus res;
1051
1052 try
1053 {
1054 res = iterateThroughAllPiecesInPartition(timeouts, task_partition, is_unprioritized_task);
1055 }
1056 catch (...)
1057 {
1058 tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
1059 res = TaskStatus::Error;
1060 }
1061
1062 /// At the end of each task check if the config is updated
1063 try
1064 {
1065 updateConfigIfNeeded();
1066 }
1067 catch (...)
1068 {
1069 tryLogCurrentException(log, "An error occurred while updating the config");
1070 }
1071
1072 return res;
1073}
1074
1075TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
1076 bool is_unprioritized_task)
1077{
1078 const size_t total_number_of_pieces = task_partition.task_shard.task_table.number_of_splits;
1079
1080 TaskStatus res{TaskStatus::Finished};
1081
1082 bool was_failed_pieces = false;
1083 bool was_active_pieces = false;
1084
1085 for (size_t piece_number = 0; piece_number < total_number_of_pieces; piece_number++)
1086 {
1087 for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
1088 {
1089 LOG_INFO(log, "Attempt number {} to process partition {} piece number {} on shard number {} with index {}.",
1090 try_num, task_partition.name, piece_number,
1091 task_partition.task_shard.numberInCluster(),
1092 task_partition.task_shard.indexInCluster());
1093
1094 res = processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task);
1095
1096 /// Exit if success
1097 if (res == TaskStatus::Finished)
1098 break;
1099
1100 /// Skip if the task is being processed by someone
1101 if (res == TaskStatus::Active)
1102 break;
1103
1104 /// Repeat on errors
1105 std::this_thread::sleep_for(default_sleep_time);
1106 }
1107
1108 was_active_pieces = (res == TaskStatus::Active);
1109 was_failed_pieces = (res == TaskStatus::Error);
1110 }
1111
1112 if (was_failed_pieces)
1113 return TaskStatus::Error;
1114
1115 if (was_active_pieces)
1116 return TaskStatus::Active;
1117
1118 return TaskStatus::Finished;
1119}
1120
1121
1122TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
1123 const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
1124 const size_t current_piece_number, bool is_unprioritized_task)
1125{
1126 TaskShard & task_shard = task_partition.task_shard;
1127 TaskTable & task_table = task_shard.task_table;
1128 ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
1129 ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];
1130
1131 const size_t number_of_splits = task_table.number_of_splits;
1132 const String primary_key_comma_separated = task_table.primary_key_comma_separated;
1133
1134 /// We need to update table definitions for each partition, it could be changed after ALTER
1135 createShardInternalTables(timeouts, task_shard, true);
1136
1137 auto split_table_for_current_piece = task_shard.list_of_split_tables_on_shard[current_piece_number];
1138
1139 auto zookeeper = context.getZooKeeper();
1140
1141 const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath();
1142 const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath();
1143 const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath();
1144 const String current_task_piece_status_path = partition_piece.getShardStatusPath();
1145
1146 /// Auxiliary functions:
1147
1148 /// Creates is_dirty node to initialize DROP PARTITION
1149 auto create_is_dirty_node = [&] (const CleanStateClock & clock)
1150 {
1151 if (clock.is_stale())
1152 LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
1153 else if (!clock.is_clean())
1154 LOG_DEBUG(log, "Thank you, Captain Obvious");
1155 else if (clock.discovery_version)
1156 {
1157 LOG_DEBUG(log, "Updating clean state clock");
1158 zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value());
1159 }
1160 else
1161 {
1162 LOG_DEBUG(log, "Creating clean state clock");
1163 zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
1164 }
1165 };
1166
1167 /// Returns SELECT query filtering current partition and applying user filter
1168 auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, bool enable_splitting, String limit = "")
1169 {
1170 String query;
1171 query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
1172
1173 if (enable_splitting && experimental_use_sample_offset)
1174 query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
1175
1176 /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
1177 query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
1178
1179 if (enable_splitting && !experimental_use_sample_offset)
1180 query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + " = " + toString(current_piece_number) + " )";
1181
1182 if (!task_table.where_condition_str.empty())
1183 query += " AND (" + task_table.where_condition_str + ")";
1184
1185 if (!limit.empty())
1186 query += " LIMIT " + limit;
1187
1188 ParserQuery p_query(query.data() + query.size());
1189
1190 const auto & settings = context.getSettingsRef();
1191 return parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
1192 };
1193
1194 /// Load balancing
1195 auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task);
1196
1197 LOG_DEBUG(log, "Processing {}", current_task_piece_status_path);
1198
1199 const String piece_status_path = partition_piece.getPartitionPieceShardsPath();
1200
1201 CleanStateClock clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
1202
1203 const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_status_path);
1204
1205 /// Do not start if partition piece is dirty, try to clean it
1206 if (is_clean)
1207 {
1208 LOG_DEBUG(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
1209 zookeeper->createAncestors(current_task_piece_status_path);
1210 }
1211 else
1212 {
1213 LOG_DEBUG(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
1214
1215 try
1216 {
1217 tryDropPartitionPiece(task_partition, current_piece_number, zookeeper, clean_state_clock);
1218 }
1219 catch (...)
1220 {
1221 tryLogCurrentException(log, "An error occurred when clean partition");
1222 }
1223
1224 return TaskStatus::Error;
1225 }
1226
1227 /// Create ephemeral node to mark that we are active and process the partition
1228 zookeeper->createAncestors(current_task_piece_is_active_path);
1229 zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
1230 try
1231 {
1232 partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_piece_is_active_path, *zookeeper, host_id);
1233 }
1234 catch (const Coordination::Exception & e)
1235 {
1236 if (e.code == Coordination::Error::ZNODEEXISTS)
1237 {
1238 LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path);
1239 return TaskStatus::Active;
1240 }
1241
1242 throw;
1243 }
1244
1245 /// Exit if task has been already processed;
1246 /// create blocking node to signal cleaning up if it is abandoned
1247 {
1248 String status_data;
1249 if (zookeeper->tryGet(current_task_piece_status_path, status_data))
1250 {
1251 TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
1252 if (status.state == TaskState::Finished)
1253 {
1254 LOG_DEBUG(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
1255 return TaskStatus::Finished;
1256 }
1257
1258 /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
1259 /// Initialize DROP PARTITION
1260 LOG_DEBUG(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
1261
1262 create_is_dirty_node(clean_state_clock);
1263 return TaskStatus::Error;
1264 }
1265 }
1266
1267
1268 /// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
1269 /// whether each shard have processed each partitition (and its pieces).
1270 if (partition_piece.is_absent_piece)
1271 {
1272 String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
1273 auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
1274 if (res == Coordination::Error::ZNODEEXISTS)
1275 LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
1276 if (res == Coordination::Error::ZOK)
1277 LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
1278 return TaskStatus::Finished;
1279 }
1280
1281 /// Check that destination partition is empty if we are first worker
1282 /// NOTE: this check is incorrect if pull and push tables have different partition key!
1283 String clean_start_status;
1284 if (!zookeeper->tryGet(partition_piece.getPartitionPieceCleanStartPath(), clean_start_status) || clean_start_status != "ok")
1285 {
1286 zookeeper->createIfNotExists(partition_piece.getPartitionPieceCleanStartPath(), "");
1287 auto checker = zkutil::EphemeralNodeHolder::create(partition_piece.getPartitionPieceCleanStartPath() + "/checker",
1288 *zookeeper, host_id);
1289 // Maybe we are the first worker
1290
1291 ASTPtr query_select_ast = get_select_query(split_table_for_current_piece, "count()", /*enable_splitting*/ true);
1292 UInt64 count;
1293 {
1294 Context local_context = context;
1295 // Use pull (i.e. readonly) settings, but fetch data from destination servers
1296 local_context.setSettings(task_cluster->settings_pull);
1297 local_context.setSetting("skip_unavailable_shards", true);
1298
1299 Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().getInputStream());
1300 count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
1301 }
1302
1303 if (count != 0)
1304 {
1305 LOG_INFO(log, "Partition {} piece {}is not empty. In contains {} rows.", task_partition.name, current_piece_number, count);
1306 Coordination::Stat stat_shards{};
1307 zookeeper->get(partition_piece.getPartitionPieceShardsPath(), &stat_shards);
1308
1309 /// NOTE: partition is still fresh if dirt discovery happens before cleaning
1310 if (stat_shards.numChildren == 0)
1311 {
1312 LOG_WARNING(log, "There are no workers for partition {} piece {}, but destination table contains {} rows. Partition will be dropped and refilled.", task_partition.name, toString(current_piece_number), count);
1313
1314 create_is_dirty_node(clean_state_clock);
1315 return TaskStatus::Error;
1316 }
1317 }
1318 zookeeper->set(partition_piece.getPartitionPieceCleanStartPath(), "ok");
1319 }
1320 /// At this point, we need to sync that the destination table is clean
1321 /// before any actual work
1322
1323 /// Try start processing, create node about it
1324 {
1325 String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
1326 CleanStateClock new_clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
1327 if (clean_state_clock != new_clean_state_clock)
1328 {
1329 LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
1330 return TaskStatus::Error;
1331 }
1332 else if (!new_clean_state_clock.is_clean())
1333 {
1334 LOG_INFO(log, "Partition {} piece {} is dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
1335 create_is_dirty_node(new_clean_state_clock);
1336 return TaskStatus::Error;
1337 }
1338 zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent);
1339 }
1340
1341 /// Try create table (if not exists) on each shard
1342 {
1343 /// Define push table for current partition piece
1344 auto database_and_table_for_current_piece= std::pair<String, String>(
1345 task_table.table_push.first,
1346 task_table.table_push.second + "_piece_" + toString(current_piece_number));
1347
1348 auto new_engine_push_ast = task_table.engine_push_ast;
1349 if (task_table.isReplicatedTable())
1350 {
1351 new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
1352 }
1353
1354 auto create_query_push_ast = rewriteCreateQueryStorage(
1355 task_shard.current_pull_table_create_query,
1356 database_and_table_for_current_piece, new_engine_push_ast);
1357
1358 create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
1359 String query = queryToString(create_query_push_ast);
1360
1361 LOG_DEBUG(log, "Create destination tables. Query: {}", query);
1362 UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
1363 LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
1364 getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
1365 }
1366
1367 /// Do the copying
1368 {
1369 bool inject_fault = false;
1370 if (copy_fault_probability > 0)
1371 {
1372 double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
1373 inject_fault = value < copy_fault_probability;
1374 }
1375
1376 // Select all fields
1377 ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
1378
1379 LOG_DEBUG(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
1380
1381 ASTPtr query_insert_ast;
1382 {
1383 String query;
1384 query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
1385
1386 ParserQuery p_query(query.data() + query.size());
1387 const auto & settings = context.getSettingsRef();
1388 query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
1389
1390 LOG_DEBUG(log, "Executing INSERT query: {}", query);
1391 }
1392
1393 try
1394 {
1395 std::unique_ptr<Context> context_select = std::make_unique<Context>(context);
1396 context_select->setSettings(task_cluster->settings_pull);
1397
1398 std::unique_ptr<Context> context_insert = std::make_unique<Context>(context);
1399 context_insert->setSettings(task_cluster->settings_push);
1400
1401 /// Custom INSERT SELECT implementation
1402 BlockInputStreamPtr input;
1403 BlockOutputStreamPtr output;
1404 {
1405 BlockIO io_select = InterpreterFactory::get(query_select_ast, *context_select)->execute();
1406 BlockIO io_insert = InterpreterFactory::get(query_insert_ast, *context_insert)->execute();
1407
1408 input = io_select.getInputStream();
1409 output = io_insert.out;
1410 }
1411
1412 /// Fail-fast optimization to abort copying when the current clean state expires
1413 std::future<Coordination::ExistsResponse> future_is_dirty_checker;
1414
1415 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
1416 constexpr UInt64 check_period_milliseconds = 500;
1417
1418 /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
1419 auto cancel_check = [&] ()
1420 {
1421 if (zookeeper->expired())
1422 throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1423
1424 if (!future_is_dirty_checker.valid())
1425 future_is_dirty_checker = zookeeper->asyncExists(piece_is_dirty_flag_path);
1426
1427 /// check_period_milliseconds should less than average insert time of single block
1428 /// Otherwise, the insertion will slow a little bit
1429 if (watch.elapsedMilliseconds() >= check_period_milliseconds)
1430 {
1431 Coordination::ExistsResponse status = future_is_dirty_checker.get();
1432
1433 if (status.error != Coordination::Error::ZNONODE)
1434 {
1435 LogicalClock dirt_discovery_epoch (status.stat.mzxid);
1436 if (dirt_discovery_epoch == clean_state_clock.discovery_zxid)
1437 return false;
1438 throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1439 }
1440 }
1441
1442 return false;
1443 };
1444
1445 /// Update statistics
1446 /// It is quite rough: bytes_copied don't take into account DROP PARTITION.
1447 auto update_stats = [&cluster_partition] (const Block & block)
1448 {
1449 cluster_partition.bytes_copied += block.bytes();
1450 cluster_partition.rows_copied += block.rows();
1451 cluster_partition.blocks_copied += 1;
1452 };
1453
1454 /// Main work is here
1455 copyData(*input, *output, cancel_check, update_stats);
1456
1457 // Just in case
1458 if (future_is_dirty_checker.valid())
1459 future_is_dirty_checker.get();
1460
1461 if (inject_fault)
1462 throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
1463 }
1464 catch (...)
1465 {
1466 tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
1467 create_is_dirty_node(clean_state_clock);
1468 return TaskStatus::Error;
1469 }
1470 }
1471
1472 LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
1473
1474
1475 /// Try create original table (if not exists) on each shard
1476 try
1477 {
1478 auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
1479 task_table.table_push, task_table.engine_push_ast);
1480 create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
1481 String query = queryToString(create_query_push_ast);
1482
1483 LOG_DEBUG(log, "Create destination tables. Query: {}", query);
1484 UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
1485 LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
1486 }
1487 catch (...)
1488 {
1489 tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
1490 }
1491
1492 /// Finalize the processing, change state of current partition task (and also check is_dirty flag)
1493 {
1494 String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
1495 CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
1496 if (clean_state_clock != new_clean_state_clock)
1497 {
1498 LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
1499 return TaskStatus::Error;
1500 }
1501 else if (!new_clean_state_clock.is_clean())
1502 {
1503 LOG_INFO(log, "Partition {} piece {} became dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
1504 create_is_dirty_node(new_clean_state_clock);
1505 return TaskStatus::Error;
1506 }
1507 zookeeper->set(current_task_piece_status_path, state_finished, 0);
1508 }
1509
1510 return TaskStatus::Finished;
1511}
1512
1513void ClusterCopier::dropAndCreateLocalTable(const ASTPtr & create_ast)
1514{
1515 const auto & create = create_ast->as<ASTCreateQuery &>();
1516 dropLocalTableIfExists({create.database, create.table});
1517
1518 InterpreterCreateQuery interpreter(create_ast, context);
1519 interpreter.execute();
1520}
1521
1522void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_name) const
1523{
1524 auto drop_ast = std::make_shared<ASTDropQuery>();
1525 drop_ast->if_exists = true;
1526 drop_ast->database = table_name.first;
1527 drop_ast->table = table_name.second;
1528
1529 InterpreterDropQuery interpreter(drop_ast, context);
1530 interpreter.execute();
1531}
1532
1533
1534void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
1535{
1536 LOG_DEBUG(log, "Removing helping tables");
1537 for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
1538 {
1539 DatabaseAndTableName original_table = task_table.table_push;
1540 DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
1541
1542 String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
1543
1544 const ClusterPtr & cluster_push = task_table.cluster_push;
1545 Settings settings_push = task_cluster->settings_push;
1546
1547 LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
1548 /// We have to drop partition_piece on each replica
1549 UInt64 num_nodes = executeQueryOnCluster(
1550 cluster_push, query,
1551 settings_push,
1552 PoolMode::GET_MANY,
1553 ClusterExecutionMode::ON_EACH_NODE);
1554
1555 LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
1556 }
1557}
1558
1559
1560void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
1561{
1562 LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
1563 for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
1564 {
1565 DatabaseAndTableName original_table = task_table.table_push;
1566 DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
1567
1568 String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;
1569
1570 const ClusterPtr & cluster_push = task_table.cluster_push;
1571 Settings settings_push = task_cluster->settings_push;
1572
1573 LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
1574 /// We have to drop partition_piece on each replica
1575 UInt64 num_nodes = executeQueryOnCluster(
1576 cluster_push, query,
1577 settings_push,
1578 PoolMode::GET_MANY,
1579 ClusterExecutionMode::ON_EACH_NODE);
1580
1581 LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
1582 }
1583 LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
1584}
1585
1586String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings)
1587{
1588 String query = "SHOW CREATE TABLE " + getQuotedTable(table);
1589 Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
1590 connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
1591
1592 return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
1593}
1594
1595ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
1596{
1597 /// Fetch and parse (possibly) new definition
1598 auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
1599 String create_query_pull_str = getRemoteCreateTable(
1600 task_shard.task_table.table_pull,
1601 *connection_entry,
1602 &task_cluster->settings_pull);
1603
1604 ParserCreateQuery parser_create_query;
1605 const auto & settings = context.getSettingsRef();
1606 return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
1607}
1608
1609/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
1610void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
1611 TaskShard & task_shard, bool create_split)
1612{
1613 TaskTable & task_table = task_shard.task_table;
1614
1615 /// We need to update table definitions for each part, it could be changed after ALTER
1616 task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
1617
1618 /// Create local Distributed tables:
1619 /// a table fetching data from current shard and a table inserting data to the whole destination cluster
1620 String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + ".";
1621 String split_shard_prefix = ".split.";
1622 task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
1623 task_shard.main_table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id);
1624
1625 for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
1626 {
1627 task_shard.list_of_split_tables_on_shard[piece_number] =
1628 DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id + "_piece_" + toString(piece_number));
1629 }
1630
1631 /// Create special cluster with single shard
1632 String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
1633 ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
1634 context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
1635
1636 auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
1637
1638 auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
1639
1640 auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
1641 dropAndCreateLocalTable(create_table_pull_ast);
1642
1643 if (create_split)
1644 {
1645 auto create_table_split_piece_ast = rewriteCreateQueryStorage(
1646 create_query_ast,
1647 task_shard.main_table_split_shard,
1648 task_table.main_engine_split_ast);
1649
1650 dropAndCreateLocalTable(create_table_split_piece_ast);
1651
1652 /// Create auxiliary split tables for each piece
1653 for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
1654 {
1655 const auto & storage_piece_split_ast = task_table.auxiliary_engine_split_asts[piece_number];
1656
1657 create_table_split_piece_ast = rewriteCreateQueryStorage(
1658 create_query_ast,
1659 task_shard.list_of_split_tables_on_shard[piece_number],
1660 storage_piece_split_ast);
1661
1662 dropAndCreateLocalTable(create_table_split_piece_ast);
1663 }
1664 }
1665
1666}
1667
1668
1669std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
1670{
1671 createShardInternalTables(timeouts, task_shard, false);
1672
1673 TaskTable & task_table = task_shard.task_table;
1674
1675 String query;
1676 {
1677 WriteBufferFromOwnString wb;
1678 wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
1679 << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
1680 query = wb.str();
1681 }
1682
1683 ParserQuery parser_query(query.data() + query.size());
1684 const auto & settings = context.getSettingsRef();
1685 ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
1686
1687 LOG_DEBUG(log, "Computing destination partition set, executing query: {}", query);
1688
1689 Context local_context = context;
1690 local_context.setSettings(task_cluster->settings_pull);
1691 Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
1692
1693 std::set<String> res;
1694 if (block)
1695 {
1696 ColumnWithTypeAndName & column = block.getByPosition(0);
1697 task_shard.partition_key_column = column;
1698
1699 for (size_t i = 0; i < column.column->size(); ++i)
1700 {
1701 WriteBufferFromOwnString wb;
1702 column.type->serializeAsTextQuoted(*column.column, i, wb, FormatSettings());
1703 res.emplace(wb.str());
1704 }
1705 }
1706
1707 LOG_DEBUG(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
1708
1709 return res;
1710}
1711
1712bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
1713 TaskShard & task_shard, const String & partition_quoted_name)
1714{
1715 createShardInternalTables(timeouts, task_shard, false);
1716
1717 TaskTable & task_table = task_shard.task_table;
1718
1719 std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
1720 + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) +
1721 " = (" + partition_quoted_name + " AS partition_key))";
1722
1723 if (!task_table.where_condition_str.empty())
1724 query += " AND (" + task_table.where_condition_str + ")";
1725
1726 query += " LIMIT 1";
1727
1728 LOG_DEBUG(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
1729
1730 ParserQuery parser_query(query.data() + query.size());
1731const auto & settings = context.getSettingsRef();
1732 ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
1733
1734 Context local_context = context;
1735 local_context.setSettings(task_cluster->settings_pull);
1736 return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
1737}
1738
1739bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
1740 TaskShard & task_shard, const String & partition_quoted_name, size_t current_piece_number)
1741{
1742 createShardInternalTables(timeouts, task_shard, false);
1743
1744 TaskTable & task_table = task_shard.task_table;
1745 const size_t number_of_splits = task_table.number_of_splits;
1746 const String & primary_key_comma_separated = task_table.primary_key_comma_separated;
1747
1748 UNUSED(primary_key_comma_separated);
1749
1750 std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard);
1751
1752 if (experimental_use_sample_offset)
1753 query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
1754
1755 query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast)
1756 + " = (" + partition_quoted_name + " AS partition_key))";
1757
1758 if (!experimental_use_sample_offset)
1759 query += " AND (cityHash64(" + primary_key_comma_separated + ") % "
1760 + std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )";
1761
1762 if (!task_table.where_condition_str.empty())
1763 query += " AND (" + task_table.where_condition_str + ")";
1764
1765 query += " LIMIT 1";
1766
1767 LOG_DEBUG(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
1768
1769 ParserQuery parser_query(query.data() + query.size());
1770 const auto & settings = context.getSettingsRef();
1771 ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
1772
1773 Context local_context = context;
1774 local_context.setSettings(task_cluster->settings_pull);
1775 auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
1776 if (result != 0)
1777 LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
1778 else
1779 LOG_DEBUG(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
1780 return result != 0;
1781}
1782
1783/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
1784 * Returns number of shards for which at least one replica executed query successfully
1785 */
1786UInt64 ClusterCopier::executeQueryOnCluster(
1787 const ClusterPtr & cluster,
1788 const String & query,
1789 const Settings & current_settings,
1790 PoolMode pool_mode,
1791 ClusterExecutionMode execution_mode,
1792 UInt64 max_successful_executions_per_shard) const
1793{
1794 auto num_shards = cluster->getShardsInfo().size();
1795 std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
1796
1797 ParserQuery p_query(query.data() + query.size());
1798 ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
1799
1800 /// We will have to execute query on each replica of a shard.
1801 if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
1802 max_successful_executions_per_shard = 0;
1803
1804 std::atomic<size_t> origin_replicas_number;
1805
1806 /// We need to execute query on one replica at least
1807 auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
1808 {
1809 setThreadName("QueryForShard");
1810
1811 const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
1812 UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
1813 num_successful_executions = 0;
1814
1815 auto increment_and_check_exit = [&] () -> bool
1816 {
1817 ++num_successful_executions;
1818 return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
1819 };
1820
1821 UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
1822
1823 origin_replicas_number += num_replicas;
1824 UInt64 num_local_replicas = shard.getLocalNodeCount();
1825 UInt64 num_remote_replicas = num_replicas - num_local_replicas;
1826
1827 /// In that case we don't have local replicas, but do it just in case
1828 for (UInt64 i = 0; i < num_local_replicas; ++i)
1829 {
1830 auto interpreter = InterpreterFactory::get(query_ast, context);
1831 interpreter->execute();
1832
1833 if (increment_and_check_exit())
1834 return;
1835 }
1836
1837 /// Will try to make as many as possible queries
1838 if (shard.hasRemoteConnections())
1839 {
1840 shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
1841
1842 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
1843 auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
1844
1845 for (auto & connection : connections)
1846 {
1847 if (connection.isNull())
1848 continue;
1849
1850 try
1851 {
1852 /// CREATE TABLE and DROP PARTITION queries return empty block
1853 RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings};
1854 NullBlockOutputStream output{Block{}};
1855 copyData(stream, output);
1856
1857 if (increment_and_check_exit())
1858 return;
1859 }
1860 catch (const Exception &)
1861 {
1862 LOG_INFO(log, getCurrentExceptionMessage(false, true));
1863 }
1864 }
1865 }
1866 };
1867
1868 {
1869 ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
1870
1871 for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
1872 thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
1873
1874 thread_pool.wait();
1875 }
1876
1877 UInt64 successful_nodes = 0;
1878 for (UInt64 num_replicas : per_shard_num_successful_replicas)
1879 {
1880 if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
1881 successful_nodes += num_replicas;
1882 else
1883 /// Count only successful shards
1884 successful_nodes += (num_replicas > 0);
1885 }
1886
1887 if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
1888 {
1889 LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
1890 }
1891
1892 return successful_nodes;
1893}
1894
1895}