Branch data Line data Source code
1 : : #include <Core/ProtocolDefines.h>
2 : : #include <IO/ReadBuffer.h>
3 : : #include <IO/ReadHelpers.h>
4 : : #include <IO/WriteHelpers.h>
5 : : #include <Interpreters/ClientInfo.h>
6 : : #include <base/getFQDNOrHostName.h>
7 : : #include <Poco/Net/HTTPRequest.h>
8 : : #include <Poco/Net/SocketAddress.h>
9 : :
10 : : #include <Common/config_version.h>
11 : :
12 : : #include <boost/algorithm/string/trim.hpp>
13 : : #include <fmt/format.h>
14 : : #include <unistd.h>
15 : :
16 : :
17 : : namespace DB
18 : : {
19 : :
20 : : namespace ErrorCodes
21 : : {
22 : : extern const int LOGICAL_ERROR;
23 : : }
24 : :
25 : : ClientInfo::ClientInfo()
26 : 5370088 : {
27 : 5370088 : connection_address = std::make_shared<Poco::Net::SocketAddress>();
28 : 5370088 : current_address = std::make_shared<Poco::Net::SocketAddress>();
29 : 5370088 : initial_address = std::make_shared<Poco::Net::SocketAddress>();
30 : 5370088 : }
31 : :
32 : : std::optional<Poco::Net::SocketAddress> ClientInfo::getLastForwardedFor() const
33 : 5858833 : {
34 [ + + ]: 5858833 : if (forwarded_for.empty())
35 : 5858208 : return {};
36 : 625 : String last = forwarded_for.substr(forwarded_for.find_last_of(',') + 1);
37 : 625 : boost::trim(last);
38 : :
39 : : /// IPv6 address with port
40 [ + + ]: 625 : if (last[0] == '[')
41 : 68 : return Poco::Net::SocketAddress{Poco::Net::AddressFamily::IPv6, last};
42 : :
43 : 557 : const auto colons = std::count(last.begin(), last.end(), ':');
44 : :
45 : : /// IPv6 address without port
46 [ + + ]: 557 : if (colons > 1)
47 : 68 : return Poco::Net::SocketAddress{Poco::Net::AddressFamily::IPv6, last, 0};
48 : :
49 : : /// IPv4 address with port
50 [ + + ]: 489 : if (colons == 1)
51 : 185 : return Poco::Net::SocketAddress{Poco::Net::AddressFamily::IPv4, last};
52 : :
53 : : /// IPv4 address without port
54 : 304 : return Poco::Net::SocketAddress{Poco::Net::AddressFamily::IPv4, last, 0};
55 : 489 : }
56 : :
57 : : String ClientInfo::getLastForwardedForHost() const
58 : 5368000 : {
59 : 5368000 : auto addr = getLastForwardedFor();
60 [ + + ]: 5368000 : return addr ? addr->host().toString() : "";
61 : 5368000 : }
62 : :
63 : :
64 : : void ClientInfo::write(WriteBuffer & out, UInt64 server_protocol_revision) const
65 : 708508 : {
66 [ - + ]: 708508 : if (server_protocol_revision < DBMS_MIN_REVISION_WITH_CLIENT_INFO)
67 : 0 : throw Exception(ErrorCodes::LOGICAL_ERROR, "Method ClientInfo::write is called for unsupported server revision");
68 : :
69 : 708508 : writeBinary(static_cast<UInt8>(query_kind), out);
70 [ + + ]: 708508 : if (empty())
71 : 3 : return;
72 : :
73 : 708505 : writeBinary(initial_user, out);
74 : 708505 : writeBinary(initial_query_id, out);
75 : 708505 : writeBinary(initial_address->toString(), out);
76 : :
77 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
78 : 708502 : writeBinary(initial_query_start_time_microseconds, out);
79 : :
80 : 708505 : writeBinary(static_cast<UInt8>(interface), out);
81 : :
82 [ + + ]: 708505 : if (interface == Interface::TCP)
83 : 708205 : {
84 : 708205 : writeBinary(os_user, out);
85 : 708205 : writeBinary(client_hostname, out);
86 : 708205 : writeBinary(client_name, out);
87 : 708205 : writeVarUInt(client_version_major, out);
88 : 708205 : writeVarUInt(client_version_minor, out);
89 : 708205 : writeVarUInt(client_tcp_protocol_version, out);
90 : 708205 : }
91 [ + + ]: 300 : else if (interface == Interface::HTTP)
92 : 220 : {
93 : 220 : writeBinary(static_cast<UInt8>(http_method), out);
94 : 220 : writeBinary(http_user_agent, out);
95 : :
96 [ + + ]: 220 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO)
97 : 219 : writeBinary(forwarded_for, out);
98 : :
99 [ + + ]: 220 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO)
100 : 219 : writeBinary(http_referer, out);
101 : 220 : }
102 : :
103 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
104 : 708521 : writeBinary(quota_key, out);
105 : :
106 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
107 : 708477 : writeVarUInt(distributed_depth, out);
108 : :
109 [ + + ]: 708505 : if (interface == Interface::TCP)
110 : 708185 : {
111 [ + + ]: 708185 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
112 : 708187 : writeVarUInt(client_version_patch, out);
113 : 708185 : }
114 : :
115 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
116 : 708517 : {
117 [ + + ]: 708517 : if (client_trace_context.trace_id != UUID())
118 : 496 : {
119 : : // Have OpenTelemetry header.
120 : 496 : writeBinary(uint8_t(1), out);
121 : : // No point writing these numbers with variable length, because they
122 : : // are random and will probably require the full length anyway.
123 : 496 : writeBinary(client_trace_context.trace_id, out);
124 : 496 : writeBinary(client_trace_context.span_id, out);
125 : 496 : writeBinary(client_trace_context.tracestate, out);
126 : 496 : writeBinary(client_trace_context.trace_flags, out);
127 : 496 : }
128 : 708021 : else
129 : 708021 : {
130 : : // Don't have OpenTelemetry header.
131 : 708021 : writeBinary(static_cast<UInt8>(0), out);
132 : 708021 : }
133 : 708517 : }
134 : :
135 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
136 : 708497 : {
137 : 708497 : writeVarUInt(static_cast<UInt64>(collaborate_with_initiator), out);
138 : 708497 : writeVarUInt(obsolete_count_participating_replicas, out);
139 : 708497 : writeVarUInt(number_of_current_replica, out);
140 : 708497 : }
141 : :
142 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS)
143 : 708485 : {
144 : 708485 : writeVarUInt(script_query_number, out);
145 : 708485 : writeVarUInt(script_line_number, out);
146 : 708485 : }
147 : :
148 [ + + ]: 708505 : if (server_protocol_revision >= DBMS_MIN_REVISON_WITH_JWT_IN_INTERSERVER)
149 : 708479 : {
150 [ - + ]: 708479 : if (!jwt.empty())
151 : 0 : {
152 : 0 : writeBinary(static_cast<UInt8>(1), out);
153 : 0 : writeBinary(jwt, out);
154 : 0 : }
155 : 708479 : else
156 : 708479 : writeBinary(static_cast<UInt8>(0), out);
157 : 708479 : }
158 : 708505 : }
159 : :
160 : :
161 : : void ClientInfo::read(ReadBuffer & in, UInt64 client_protocol_revision)
162 : 773488 : {
163 [ - + ]: 773488 : if (client_protocol_revision < DBMS_MIN_REVISION_WITH_CLIENT_INFO)
164 : 0 : throw Exception(ErrorCodes::LOGICAL_ERROR, "Method ClientInfo::read is called for unsupported client revision");
165 : :
166 : 773488 : UInt8 read_query_kind = 0;
167 : 773488 : readBinary(read_query_kind, in);
168 : 773488 : query_kind = QueryKind(read_query_kind);
169 [ + + ]: 773488 : if (empty())
170 : 7 : return;
171 : :
172 : 773481 : readBinary(initial_user, in);
173 : 773481 : readBinary(initial_query_id, in);
174 : :
175 : 773481 : String initial_address_string;
176 : 773481 : readBinary(initial_address_string, in);
177 : 773481 : initial_address = std::make_shared<Poco::Net::SocketAddress>(initial_address_string);
178 : :
179 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
180 : 773452 : {
181 : 773452 : readBinary(initial_query_start_time_microseconds, in);
182 : 773452 : initial_query_start_time = initial_query_start_time_microseconds / 1000000;
183 : 773452 : }
184 : :
185 : 773481 : UInt8 read_interface = 0;
186 : 773481 : readBinary(read_interface, in);
187 : 773481 : interface = Interface(read_interface);
188 : :
189 [ + + ]: 773481 : if (interface == Interface::TCP)
190 : 773091 : {
191 : 773091 : readBinary(os_user, in);
192 : 773091 : readBinary(client_hostname, in);
193 : 773091 : readBinary(client_name, in);
194 : 773091 : readVarUInt(client_version_major, in);
195 : 773091 : readVarUInt(client_version_minor, in);
196 : 773091 : readVarUInt(client_tcp_protocol_version, in);
197 : 773091 : }
198 [ + + ]: 390 : else if (interface == Interface::HTTP)
199 : 220 : {
200 : 220 : UInt8 read_http_method = 0;
201 : 220 : readBinary(read_http_method, in);
202 : 220 : http_method = HTTPMethod(read_http_method);
203 : :
204 : 220 : readBinary(http_user_agent, in);
205 : :
206 [ + - ]: 220 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO)
207 : 220 : readBinary(forwarded_for, in);
208 : :
209 [ + - ]: 220 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO)
210 : 220 : readBinary(http_referer, in);
211 : 220 : }
212 : :
213 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
214 : 773368 : readBinary(quota_key, in);
215 : :
216 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
217 : 773415 : readVarUInt(distributed_depth, in);
218 : :
219 [ + + ]: 773481 : if (interface == Interface::TCP)
220 : 773027 : {
221 [ + + ]: 773027 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
222 : 773035 : readVarUInt(client_version_patch, in);
223 : >1844*10^16 : else
224 : >1844*10^16 : client_version_patch = client_tcp_protocol_version;
225 : 773027 : }
226 : :
227 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
228 : 773316 : {
229 : 773316 : uint8_t have_trace_id = 0;
230 : 773316 : readBinary(have_trace_id, in);
231 [ + + ]: 773316 : if (have_trace_id)
232 : 527 : {
233 : 527 : readBinary(client_trace_context.trace_id, in);
234 : 527 : readBinary(client_trace_context.span_id, in);
235 : 527 : readBinary(client_trace_context.tracestate, in);
236 : 527 : readBinary(client_trace_context.trace_flags, in);
237 : 527 : }
238 : 773316 : }
239 : :
240 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
241 : 773341 : {
242 : 773341 : UInt64 value;
243 : 773341 : readVarUInt(value, in);
244 : 773341 : collaborate_with_initiator = static_cast<bool>(value);
245 : 773341 : readVarUInt(obsolete_count_participating_replicas, in);
246 : 773341 : readVarUInt(number_of_current_replica, in);
247 : 773341 : }
248 : :
249 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS)
250 : 773177 : {
251 : 773177 : readVarUInt(script_query_number, in);
252 : 773177 : readVarUInt(script_line_number, in);
253 : 773177 : }
254 : :
255 [ + + ]: 773481 : if (client_protocol_revision >= DBMS_MIN_REVISON_WITH_JWT_IN_INTERSERVER)
256 : 773191 : {
257 : 773191 : UInt8 have_jwt = 0;
258 : 773191 : readBinary(have_jwt, in);
259 [ - + ]: 773191 : if (have_jwt)
260 : 0 : readBinary(jwt, in);
261 : 773191 : }
262 : 773481 : }
263 : :
264 : :
265 : : void ClientInfo::setInitialQuery()
266 : 117020 : {
267 : 117020 : query_kind = QueryKind::INITIAL_QUERY;
268 : 117020 : fillOSUserHostNameAndVersionInfo();
269 [ + + ]: 117020 : if (client_name.empty())
270 : 3677 : client_name = VERSION_NAME;
271 : 113343 : else
272 : 113343 : client_name = std::string(VERSION_NAME) + " " + client_name;
273 : 117020 : }
274 : :
275 : : bool ClientInfo::clientVersionEquals(const ClientInfo & other, bool compare_patch) const
276 : 660422 : {
277 [ - + ]: 660422 : bool patch_equals = compare_patch ? client_version_patch == other.client_version_patch : true;
278 [ + + ]: 660422 : return client_version_major == other.client_version_major &&
279 [ + + ]: 660442 : client_version_minor == other.client_version_minor &&
280 [ + - ]: 660443 : patch_equals &&
281 [ + + ]: 660443 : client_tcp_protocol_version == other.client_tcp_protocol_version;
282 : 660422 : }
283 : :
284 : : String ClientInfo::getVersionStr() const
285 : 772539 : {
286 : 772539 : return fmt::format("{}.{}.{} ({})", client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
287 : 772539 : }
288 : :
289 : : void ClientInfo::fillOSUserHostNameAndVersionInfo()
290 : 117020 : {
291 : 117020 : os_user.resize(256, '\0');
292 [ - + ]: 117020 : if (0 == getlogin_r(os_user.data(), static_cast<int>(os_user.size() - 1)))
293 : 0 : os_user.resize(strlen(os_user.c_str()));
294 : 117020 : else
295 : 117020 : os_user.clear(); /// Don't mind if we cannot determine user login.
296 : :
297 : 117020 : client_hostname = getFQDNOrHostName();
298 : :
299 : 117020 : client_version_major = VERSION_MAJOR;
300 : 117020 : client_version_minor = VERSION_MINOR;
301 : 117020 : client_version_patch = VERSION_PATCH;
302 : 117020 : client_tcp_protocol_version = DBMS_TCP_PROTOCOL_VERSION;
303 : 117020 : }
304 : :
305 : : String toString(ClientInfo::Interface interface)
306 : 0 : {
307 [ # # ]: 0 : switch (interface)
308 : 0 : {
309 [ # # ]: 0 : case ClientInfo::Interface::TCP:
310 : 0 : return "TCP";
311 [ # # ]: 0 : case ClientInfo::Interface::HTTP:
312 : 0 : return "HTTP";
313 [ # # ]: 0 : case ClientInfo::Interface::GRPC:
314 : 0 : return "GRPC";
315 [ # # ]: 0 : case ClientInfo::Interface::MYSQL:
316 : 0 : return "MYSQL";
317 [ # # ]: 0 : case ClientInfo::Interface::POSTGRESQL:
318 : 0 : return "POSTGRESQL";
319 [ # # ]: 0 : case ClientInfo::Interface::LOCAL:
320 : 0 : return "LOCAL";
321 [ # # ]: 0 : case ClientInfo::Interface::TCP_INTERSERVER:
322 : 0 : return "TCP_INTERSERVER";
323 [ # # ]: 0 : case ClientInfo::Interface::PROMETHEUS:
324 : 0 : return "PROMETHEUS";
325 [ # # ]: 0 : case ClientInfo::Interface::BACKGROUND:
326 : 0 : return "BACKGROUND";
327 [ # # ]: 0 : case ClientInfo::Interface::ARROW_FLIGHT:
328 : 0 : return "ARROWFLIGHT";
329 : 0 : }
330 : :
331 : 0 : return fmt::format("Unknown server interface ({}).", static_cast<int>(interface));
332 : 0 : }
333 : :
334 : : void ClientInfo::setFromHTTPRequest(const Poco::Net::HTTPRequest & request)
335 : 301980 : {
336 : 301980 : http_method = ClientInfo::HTTPMethod::UNKNOWN;
337 [ + + ]: 301980 : if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
338 : 5976 : http_method = ClientInfo::HTTPMethod::GET;
339 [ + + ]: 296004 : else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
340 : 295129 : http_method = ClientInfo::HTTPMethod::POST;
341 : :
342 : 301980 : http_user_agent = request.get("User-Agent", "");
343 : 301980 : http_referer = request.get("Referer", "");
344 : 301980 : forwarded_for = request.get("X-Forwarded-For", "");
345 : :
346 [ + + ]: 301980 : for (const auto & header : request)
347 : 1045616 : {
348 : : /// These headers can contain authentication info and shouldn't be accessible by the user.
349 : 1045616 : String key_lowercase = Poco::toLower(header.first);
350 [ + + ][ + + ]: 1045616 : if (key_lowercase.starts_with("x-clickhouse") || key_lowercase == "authentication")
351 : 266 : continue;
352 : 1045350 : http_headers[header.first] = header.second;
353 : 1045350 : }
354 : 301980 : }
355 : :
356 : : String toString(ClientInfo::HTTPMethod method)
357 : 0 : {
358 [ # # ]: 0 : switch (method)
359 : 0 : {
360 [ # # ]: 0 : case ClientInfo::HTTPMethod::UNKNOWN:
361 : 0 : return "UNKNOWN";
362 [ # # ]: 0 : case ClientInfo::HTTPMethod::GET:
363 : 0 : return "GET";
364 [ # # ]: 0 : case ClientInfo::HTTPMethod::POST:
365 : 0 : return "POST";
366 [ # # ]: 0 : case ClientInfo::HTTPMethod::OPTIONS:
367 : 0 : return "OPTIONS";
368 : 0 : }
369 : 0 : }
370 : :
371 : : }
|