CodeCommitsIssuesPull requestsActionsInsightsSecurity
ba69724a22291270a1cbaf603c575b5c2fee0558

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

Interpreters/InterpreterInsertQuery.cpp

292lines · modecode

1#include <Interpreters/InterpreterInsertQuery.h>
2
3#include <DataStreams/AddingDefaultBlockOutputStream.h>
4#include <DataStreams/AddingDefaultsBlockInputStream.h>
5#include <DataStreams/CheckConstraintsBlockOutputStream.h>
6#include <DataStreams/ConvertingBlockInputStream.h>
7#include <DataStreams/CountingBlockOutputStream.h>
8#include <DataStreams/InputStreamFromASTInsertQuery.h>
9#include <DataStreams/NullAndDoCopyBlockInputStream.h>
10#include <DataStreams/NullBlockOutputStream.h>
11#include <DataStreams/OwningBlockInputStream.h>
12#include <DataStreams/PushingToViewsBlockOutputStream.h>
13#include <DataStreams/RemoteBlockInputStream.h>
14#include <DataStreams/SquashingBlockOutputStream.h>
15#include <DataStreams/copyData.h>
16#include <IO/ConcatReadBuffer.h>
17#include <IO/ReadBufferFromMemory.h>
18#include <Interpreters/InterpreterSelectWithUnionQuery.h>
19#include <Access/AccessFlags.h>
20#include <Interpreters/JoinedTables.h>
21#include <Parsers/ASTFunction.h>
22#include <Parsers/ASTInsertQuery.h>
23#include <Parsers/ASTSelectQuery.h>
24#include <Parsers/ASTSelectWithUnionQuery.h>
25#include <Parsers/queryToString.h>
26#include <Storages/Kafka/StorageKafka.h>
27#include <Storages/StorageDistributed.h>
28#include <TableFunctions/TableFunctionFactory.h>
29#include <Common/checkStackSize.h>
30
31
32namespace DB
33{
34
35namespace ErrorCodes
36{
37 extern const int NO_SUCH_COLUMN_IN_TABLE;
38 extern const int ILLEGAL_COLUMN;
39 extern const int DUPLICATE_COLUMN;
40 extern const int LOGICAL_ERROR;
41}
42
43
44InterpreterInsertQuery::InterpreterInsertQuery(
45 const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_, bool no_squash_, bool no_destination_)
46 : query_ptr(query_ptr_)
47 , context(context_)
48 , allow_materialized(allow_materialized_)
49 , no_squash(no_squash_)
50 , no_destination(no_destination_)
51{
52 checkStackSize();
53}
54
55
56StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
57{
58 if (query.table_function)
59 {
60 const auto * table_function = query.table_function->as<ASTFunction>();
61 const auto & factory = TableFunctionFactory::instance();
62 TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
63 return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
64 }
65
66 query.table_id = context.resolveStorageID(query.table_id);
67 return DatabaseCatalog::instance().getTable(query.table_id);
68}
69
70Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table)
71{
72 Block table_sample_non_materialized = table->getSampleBlockNonMaterialized();
73 /// If the query does not include information about columns
74 if (!query.columns)
75 {
76 if (no_destination)
77 return table->getSampleBlockWithVirtuals();
78 else
79 return table_sample_non_materialized;
80 }
81
82 Block table_sample = table->getSampleBlock();
83 /// Form the block based on the column names from the query
84 Block res;
85 for (const auto & identifier : query.columns->children)
86 {
87 std::string current_name = identifier->getColumnName();
88
89 /// The table does not have a column with that name
90 if (!table_sample.has(current_name))
91 throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
92
93 if (!allow_materialized && !table_sample_non_materialized.has(current_name))
94 throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
95 if (res.has(current_name))
96 throw Exception("Column " + current_name + " specified more than once", ErrorCodes::DUPLICATE_COLUMN);
97
98 res.insert(ColumnWithTypeAndName(table_sample.getByName(current_name).type, current_name));
99 }
100 return res;
101}
102
103
104BlockIO InterpreterInsertQuery::execute()
105{
106 const Settings & settings = context.getSettingsRef();
107 auto & query = query_ptr->as<ASTInsertQuery &>();
108
109 BlockIO res;
110
111 StoragePtr table = getTable(query);
112 auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
113
114 auto query_sample_block = getSampleBlock(query, table);
115 if (!query.table_function)
116 context.checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
117
118 BlockInputStreams in_streams;
119 BlockOutputStreams out_streams;
120 bool is_distributed_insert_select = false;
121
122 if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
123 {
124 // Distributed INSERT SELECT
125 std::shared_ptr<StorageDistributed> storage_src;
126 auto & select = query.select->as<ASTSelectWithUnionQuery &>();
127 auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
128 if (select.list_of_selects->children.size() == 1)
129 {
130 auto & select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery &>();
131 JoinedTables joined_tables(Context(context), select_query);
132
133 if (joined_tables.tablesCount() == 1)
134 {
135 storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
136 if (storage_src)
137 {
138 const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
139 select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
140
141 auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query.clone());
142 select_with_union_query->list_of_selects->children.push_back(new_select_query);
143
144 new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
145
146 new_query->select = select_with_union_query;
147 }
148 }
149 }
150
151 auto storage_dst = std::dynamic_pointer_cast<StorageDistributed>(table);
152
153 if (storage_src && storage_dst && storage_src->cluster_name == storage_dst->cluster_name)
154 {
155 is_distributed_insert_select = true;
156
157 const auto & cluster = storage_src->getCluster();
158 const auto & shards_info = cluster->getShardsInfo();
159
160 String new_query_str = queryToString(new_query);
161 for (size_t shard_index : ext::range(0, shards_info.size()))
162 {
163 const auto & shard_info = shards_info[shard_index];
164 if (shard_info.isLocal())
165 {
166 InterpreterInsertQuery interpreter(new_query, context);
167 auto block_io = interpreter.execute();
168 in_streams.push_back(block_io.in);
169 }
170 else
171 {
172 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
173 auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
174 if (connections.empty() || connections.front().isNull())
175 throw Exception(
176 "Expected exactly one connection for shard " + toString(shard_info.shard_num), ErrorCodes::LOGICAL_ERROR);
177
178 /// INSERT SELECT query returns empty block
179 auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, context);
180 in_streams.push_back(in_stream);
181 }
182 out_streams.push_back(std::make_shared<NullBlockOutputStream>(Block()));
183 }
184 }
185 }
186
187 if (!is_distributed_insert_select)
188 {
189 size_t out_streams_size = 1;
190 if (query.select)
191 {
192 /// Passing 1 as subquery_depth will disable limiting size of intermediate result.
193 InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
194
195 if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
196 {
197 in_streams = interpreter_select.executeWithMultipleStreams(res.pipeline);
198 out_streams_size = std::min(size_t(settings.max_insert_threads), in_streams.size());
199 }
200 else
201 {
202 res = interpreter_select.execute();
203 in_streams.emplace_back(res.in);
204 res.in = nullptr;
205 res.out = nullptr;
206 }
207 }
208
209 for (size_t i = 0; i < out_streams_size; i++)
210 {
211 /// We create a pipeline of several streams, into which we will write data.
212 BlockOutputStreamPtr out;
213
214 /// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
215 /// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
216 if (table->noPushingToViews() && !no_destination)
217 out = table->write(query_ptr, context);
218 else
219 out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
220
221 /// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
222 /// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
223 if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
224 {
225 out = std::make_shared<SquashingBlockOutputStream>(
226 out,
227 out->getHeader(),
228 context.getSettingsRef().min_insert_block_size_rows,
229 context.getSettingsRef().min_insert_block_size_bytes);
230 }
231
232 /// Actually we don't know structure of input blocks from query/table,
233 /// because some clients break insertion protocol (columns != header)
234 out = std::make_shared<AddingDefaultBlockOutputStream>(
235 out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
236
237 if (const auto & constraints = table->getConstraints(); !constraints.empty())
238 out = std::make_shared<CheckConstraintsBlockOutputStream>(
239 query.table_id, out, query_sample_block, table->getConstraints(), context);
240
241 auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
242 out_wrapper->setProcessListElement(context.getProcessListElement());
243 out = std::move(out_wrapper);
244 out_streams.emplace_back(std::move(out));
245 }
246 }
247
248 /// What type of query: INSERT or INSERT SELECT?
249 if (query.select)
250 {
251 for (auto & in_stream : in_streams)
252 {
253 in_stream = std::make_shared<ConvertingBlockInputStream>(
254 context, in_stream, out_streams.at(0)->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
255 }
256
257 Block in_header = in_streams.at(0)->getHeader();
258 if (in_streams.size() > 1)
259 {
260 for (size_t i = 1; i < in_streams.size(); ++i)
261 assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, "INSERT SELECT");
262 }
263
264 res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);
265
266 if (!allow_materialized)
267 {
268 for (const auto & column : table->getColumns())
269 if (column.default_desc.kind == ColumnDefaultKind::Materialized && in_header.has(column.name))
270 throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
271 }
272 }
273 else if (query.data && !query.has_tail) /// can execute without additional data
274 {
275 // res.out = std::move(out_streams.at(0));
276 res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
277 res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out_streams.at(0));
278 }
279 else
280 res.out = std::move(out_streams.at(0));
281 res.pipeline.addStorageHolder(table);
282
283 return res;
284}
285
286
287StorageID InterpreterInsertQuery::getDatabaseTable() const
288{
289 return query_ptr->as<ASTInsertQuery &>().table_id;
290}
291
292}