Hot spotting

Hot spotting

Computer hot spotting) may occur in Elasticsearch when resource utilizations are unevenly distributed across nodes. Temporary spikes are not usually considered problematic, but ongoing significantly unique utilization may lead to cluster bottlenecks and should be reviewed.

Detect hot spotting

Hot spotting most commonly surfaces as significantly elevated resource utilization (of disk.percent, heap.percent, or cpu) among a subset of nodes as reported via cat nodes. Individual spikes aren’t necessarily problematic, but if utilization repeatedly spikes or consistently remains high over time (for example longer than 30 seconds), the resource may be experiencing problematic hot spotting.

For example, let’s show case two separate plausible issues using cat nodes:

  1. resp = client.cat.nodes(
  2. v=True,
  3. s="master,name",
  4. h="name,master,node.role,heap.percent,disk.used_percent,cpu",
  5. )
  6. print(resp)
  1. response = client.cat.nodes(
  2. v: true,
  3. s: 'master,name',
  4. h: 'name,master,node.role,heap.percent,disk.used_percent,cpu'
  5. )
  6. puts response
  1. const response = await client.cat.nodes({
  2. v: "true",
  3. s: "master,name",
  4. h: "name,master,node.role,heap.percent,disk.used_percent,cpu",
  5. });
  6. console.log(response);
  1. GET _cat/nodes?v&s=master,name&h=name,master,node.role,heap.percent,disk.used_percent,cpu

Pretend this same output pulled twice across five minutes:

  1. name master node.role heap.percent disk.used_percent cpu
  2. node_1 * hirstm 24 20 95
  3. node_2 - hirstm 23 18 18
  4. node_3 - hirstmv 25 90 10

Here we see two significantly unique utilizations: where the master node is at cpu: 95 and a hot node is at disk.used_percent: 90%. This would indicate hot spotting was occurring on these two nodes, and not necessarily from the same root cause.

Causes

Historically, clusters experience hot spotting mainly as an effect of hardware, shard distributions, and/or task load. We’ll review these sequentially in order of their potentially impacting scope.

Hardware

Here are some common improper hardware setups which may contribute to hot spotting:

  • Resources are allocated non-uniformly. For example, if one hot node is given half the CPU of its peers. Elasticsearch expects all nodes on a data tier to share the same hardware profiles or specifications.
  • Resources are consumed by another service on the host, including other Elasticsearch nodes. Refer to our dedicated host recommendation.
  • Resources experience different network or disk throughputs. For example, if one node’s I/O is lower than its peers. Refer to Use faster hardware for more information.
  • A JVM that has been configured with a heap larger than 31GB. Refer to Set the JVM heap size for more information.
  • Problematic resources uniquely report memory swapping.

Shard distributions

Elasticsearch indices are divided into one or more shards) which can sometimes be poorly distributed. Elasticsearch accounts for this by balancing shard counts across data nodes. As introduced in version 8.6, Elasticsearch by default also enables desired balancing to account for ingest load. A node may still experience hot spotting either due to write-heavy indices or by the overall shards it’s hosting.

Node level

You can check for shard balancing via cat allocation, though as of version 8.6, desired balancing may no longer fully expect to balance shards. Kindly note, both methods may temporarily show problematic imbalance during cluster stability issues.

For example, let’s showcase two separate plausible issues using cat allocation:

  1. resp = client.cat.allocation(
  2. v=True,
  3. s="node",
  4. h="node,shards,disk.percent,disk.indices,disk.used",
  5. )
  6. print(resp)
  1. response = client.cat.allocation(
  2. v: true,
  3. s: 'node',
  4. h: 'node,shards,disk.percent,disk.indices,disk.used'
  5. )
  6. puts response
  1. const response = await client.cat.allocation({
  2. v: "true",
  3. s: "node",
  4. h: "node,shards,disk.percent,disk.indices,disk.used",
  5. });
  6. console.log(response);
  1. GET _cat/allocation?v&s=node&h=node,shards,disk.percent,disk.indices,disk.used

Which could return:

  1. node shards disk.percent disk.indices disk.used
  2. node_1 446 19 154.8gb 173.1gb
  3. node_2 31 52 44.6gb 372.7gb
  4. node_3 445 43 271.5gb 289.4gb

Here we see two significantly unique situations. node_2 has recently restarted, so it has a much lower number of shards than all other nodes. This also relates to disk.indices being much smaller than disk.used while shards are recovering as seen via cat recovery. While node_2‘s shard count is low, it may become a write hot spot due to ongoing ILM rollovers. This is a common root cause of write hot spots covered in the next section.

The second situation is that node_3 has a higher disk.percent than node_1, even though they hold roughly the same number of shards. This occurs when either shards are not evenly sized (refer to Aim for shards of up to 200M documents, or with sizes between 10GB and 50GB) or when there are a lot of empty indices.

Cluster rebalancing based on desired balance does much of the heavy lifting of keeping nodes from hot spotting. It can be limited by either nodes hitting watermarks (refer to fixing disk watermark errors) or by a write-heavy index’s total shards being much lower than the written-to nodes.

You can confirm hot spotted nodes via the nodes stats API, potentially polling twice over time to only checking for the stats differences between them rather than polling once giving you stats for the node’s full node uptime. For example, to check all nodes indexing stats:

  1. resp = client.nodes.stats(
  2. human=True,
  3. filter_path="nodes.*.name,nodes.*.indices.indexing",
  4. )
  5. print(resp)
  1. response = client.nodes.stats(
  2. human: true,
  3. filter_path: 'nodes.*.name,nodes.*.indices.indexing'
  4. )
  5. puts response
  1. const response = await client.nodes.stats({
  2. human: "true",
  3. filter_path: "nodes.*.name,nodes.*.indices.indexing",
  4. });
  5. console.log(response);
  1. GET _nodes/stats?human&filter_path=nodes.*.name,nodes.*.indices.indexing
Index level

Hot spotted nodes frequently surface via cat thread pool‘s write and search queue backups. For example:

  1. resp = client.cat.thread_pool(
  2. thread_pool_patterns="write,search",
  3. v=True,
  4. s="n,nn",
  5. h="n,nn,q,a,r,c",
  6. )
  7. print(resp)
  1. response = client.cat.thread_pool(
  2. thread_pool_patterns: 'write,search',
  3. v: true,
  4. s: 'n,nn',
  5. h: 'n,nn,q,a,r,c'
  6. )
  7. puts response
  1. const response = await client.cat.threadPool({
  2. thread_pool_patterns: "write,search",
  3. v: "true",
  4. s: "n,nn",
  5. h: "n,nn,q,a,r,c",
  6. });
  7. console.log(response);
  1. GET _cat/thread_pool/write,search?v=true&s=n,nn&h=n,nn,q,a,r,c

Which could return:

  1. n nn q a r c
  2. search node_1 3 1 0 1287
  3. search node_2 0 2 0 1159
  4. search node_3 0 1 0 1302
  5. write node_1 100 3 0 4259
  6. write node_2 0 4 0 980
  7. write node_3 1 5 0 8714

Here you can see two significantly unique situations. Firstly, node_1 has a severely backed up write queue compared to other nodes. Secondly, node_3 shows historically completed writes that are double any other node. These are both probably due to either poorly distributed write-heavy indices, or to multiple write-heavy indices allocated to the same node. Since primary and replica writes are majorly the same amount of cluster work, we usually recommend setting index.routing.allocation.total_shards_per_node to force index spreading after lining up index shard counts to total nodes.

We normally recommend heavy-write indices have sufficient primary number_of_shards and replica number_of_replicas to evenly spread across indexing nodes. Alternatively, you can reroute shards to more quiet nodes to alleviate the nodes with write hot spotting.

If it’s non-obvious what indices are problematic, you can introspect further via the index stats API by running:

  1. resp = client.indices.stats(
  2. level="shards",
  3. human=True,
  4. expand_wildcards="all",
  5. filter_path="indices.*.total.indexing.index_total",
  6. )
  7. print(resp)
  1. response = client.indices.stats(
  2. level: 'shards',
  3. human: true,
  4. expand_wildcards: 'all',
  5. filter_path: 'indices.*.total.indexing.index_total'
  6. )
  7. puts response
  1. const response = await client.indices.stats({
  2. level: "shards",
  3. human: "true",
  4. expand_wildcards: "all",
  5. filter_path: "indices.*.total.indexing.index_total",
  6. });
  7. console.log(response);
  1. GET _stats?level=shards&human&expand_wildcards=all&filter_path=indices.*.total.indexing.index_total

For more advanced analysis, you can poll for shard-level stats, which lets you compare joint index-level and node-level stats. This analysis wouldn’t account for node restarts and/or shards rerouting, but serves as overview:

  1. resp = client.indices.stats(
  2. metric="indexing,search",
  3. level="shards",
  4. human=True,
  5. expand_wildcards="all",
  6. )
  7. print(resp)
  1. response = client.indices.stats(
  2. metric: 'indexing,search',
  3. level: 'shards',
  4. human: true,
  5. expand_wildcards: 'all'
  6. )
  7. puts response
  1. const response = await client.indices.stats({
  2. metric: "indexing,search",
  3. level: "shards",
  4. human: "true",
  5. expand_wildcards: "all",
  6. });
  7. console.log(response);
  1. GET _stats/indexing,search?level=shards&human&expand_wildcards=all

You can for example use the third-party JQ tool, to process the output saved as indices_stats.json:

  1. cat indices_stats.json | jq -rc ['.indices|to_entries[]|.key as $i|.value.shards|to_entries[]|.key as $s|.value[]|{node:.routing.node[:4], index:$i, shard:$s, primary:.routing.primary, size:.store.size, total_indexing:.indexing.index_total, time_indexing:.indexing.index_time_in_millis, total_query:.search.query_total, time_query:.search.query_time_in_millis } | .+{ avg_indexing: (if .total_indexing>0 then (.time_indexing/.total_indexing|round) else 0 end), avg_search: (if .total_search>0 then (.time_search/.total_search|round) else 0 end) }'] > shard_stats.json
  2. # show top written-to shard simplified stats which contain their index and node references
  3. cat shard_stats.json | jq -rc 'sort_by(-.avg_indexing)[]' | head

Task loads

Shard distribution problems will most-likely surface as task load as seen above in the cat thread pool example. It is also possible for tasks to hot spot a node either due to individual qualitative expensiveness or overall quantitative traffic loads.

For example, if cat thread pool reported a high queue on the warmer thread pool, you would look-up the effected node’s hot threads. Let’s say it reported warmer threads at 100% cpu related to GlobalOrdinalsBuilder. This would let you know to inspect field data’s global ordinals.

Alternatively, let’s say cat nodes shows a hot spotted master node and cat thread pool shows general queuing across nodes. This would suggest the master node is overwhelmed. To resolve this, first ensure hardware high availability setup and then look to ephemeral causes. In this example, the nodes hot threads API reports multiple threads in other which indicates they’re waiting on or blocked by either garbage collection or I/O.

For either of these example situations, a good way to confirm the problematic tasks is to look at longest running non-continuous (designated [c]) tasks via cat task management. This can be supplemented checking longest running cluster sync tasks via cat pending tasks. Using a third example,

  1. resp = client.cat.tasks(
  2. v=True,
  3. s="time:desc",
  4. h="type,action,running_time,node,cancellable",
  5. )
  6. print(resp)
  1. response = client.cat.tasks(
  2. v: true,
  3. s: 'time:desc',
  4. h: 'type,action,running_time,node,cancellable'
  5. )
  6. puts response
  1. const response = await client.cat.tasks({
  2. v: "true",
  3. s: "time:desc",
  4. h: "type,action,running_time,node,cancellable",
  5. });
  6. console.log(response);
  1. GET _cat/tasks?v&s=time:desc&h=type,action,running_time,node,cancellable

This could return:

  1. type action running_time node cancellable
  2. direct indices:data/read/eql 10m node_1 true
  3. ...

This surfaces a problematic EQL query. We can gain further insight on it via the task management API,

  1. resp = client.tasks.list(
  2. human=True,
  3. detailed=True,
  4. )
  5. print(resp)
  1. response = client.tasks.list(
  2. human: true,
  3. detailed: true
  4. )
  5. puts response
  1. const response = await client.tasks.list({
  2. human: "true",
  3. detailed: "true",
  4. });
  5. console.log(response);
  1. GET _tasks?human&detailed

Its response contains a description that reports this query:

  1. indices[winlogbeat-*,logs-window*], sequence by winlog.computer_name with maxspan=1m\n\n[authentication where host.os.type == "windows" and event.action:"logged-in" and\n event.outcome == "success" and process.name == "svchost.exe" ] by winlog.event_data.TargetLogonId

This lets you know which indices to check (winlogbeat-*,logs-window*), as well as the EQL search request body. Most likely this is SIEM related. You can combine this with audit logging as needed to trace the request source.