Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changes for crash
Unreleased
==========

- Added ``\shards`` to show shard relocation progress. With optinal arguments
Comment thread
mannreis marked this conversation as resolved.
Outdated
``state`` and ``relocating``.

2026/02/09 0.32.0
=================

Expand Down
6 changes: 6 additions & 0 deletions docs/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ Every command starts with a ``\`` character.
| ``\r <FILENAME>`` | Reads statements from ``<FILENAME>`` and execute |
| | them. |
+------------------------+-----------------------------------------------------+
| ``\shards [VIEW]`` | Queries ``sys.shards`` table and computes relocation|
Comment thread
mannreis marked this conversation as resolved.
Outdated
| | progress progress per table. If ``VIEW`` is instead:|
| | |
| | - ``state`` provides aggregeration on shard state |
| | - ``relocating`` tracks which shards are relocated |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| | - ``state`` provides aggregeration on shard state |
| | - ``relocating`` tracks which shards are relocated |
| | - ``state`` provides aggregation on shard state |
| | - ``relocating`` tracks which shards are relocating |

+------------------------+-----------------------------------------------------+
| ``\sysinfo`` | Query the ``sys`` tables for system and cluster |
| | information. |
+------------------------+-----------------------------------------------------+
Expand Down
107 changes: 99 additions & 8 deletions src/crate/crash/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,104 @@ def __call__(self, cmd, check_name=None, **kwargs):
cmd.logger.warn('No check for {}'.format(check_name))


class ShardsCommand(Command):
"""shows progress of shards relocation (optional arguments: `state` and `relocating`)"""

DEFAULT_STMT = """
SELECT
table_name,
COUNT(*)
AS total_shards,
SUM(num_docs)
AS total_num_docs,
SUM(size)
As total_sum_shard_size,
SUM(CASE WHEN routing_state = 'RELOCATING' THEN 1 ELSE 0 END)
AS relocating_shards,
SUM(CASE WHEN routing_state = 'RELOCATING' THEN size ELSE 0 END)
AS relocating_size,
100.0 * SUM(CASE WHEN routing_state != 'RELOCATING' THEN size ELSE 0 END) / CAST(SUM(size) as DOUBLE)
AS relocated_percent
FROM sys.shards
WHERE routing_state != 'UNASSIGNED'
GROUP BY table_name
ORDER BY relocated_percent, table_name;
Comment thread
mannreis marked this conversation as resolved.
Outdated
"""

STATE_STMT = """
SELECT
routing_state,
COUNT(*)
AS shard_count,
SUM(num_docs)
AS num_docs,
SUM(size) / 1073741824.0
AS size_gb
FROM sys.shards
GROUP BY routing_state
ORDER BY routing_state;
Comment thread
mannreis marked this conversation as resolved.
Outdated
"""

RELOC_STMT = """
SELECT
table_name,
node['name'],
id,
recovery['stage'],
size,
routing_state,
state,
primary,
relocating_node,
size / 1024.0
AS size_kb,
partition_ident
FROM sys.shards
WHERE routing_state = 'RELOCATING'
ORDER BY table_name, id, node['name'];
"""
Comment thread
mannreis marked this conversation as resolved.
Outdated

OPTIONS = {
"state": STATE_STMT,
"relocating": RELOC_STMT,
}

def complete(self, cmd, text):
return (i for i in self.OPTIONS if i.startswith(text) or text.isspace())

def execute(self, cmd, stmt):
success = cmd._exec(stmt)
cmd.exit_code = cmd.exit_code or int(not success)
if not success:
cmd.logger.warn("FAILED")
return False

cur = cmd.cursor
shards = cur.fetchall()
cmd.pprint(shards, [c[0] for c in cur.description])
return True

def __call__(self, cmd, *args, **kwargs):
if len(args) == 0:
self.execute(cmd, self.DEFAULT_STMT)
return

stmt = self.OPTIONS.get(args[0].strip())
if stmt:
self.execute(cmd, stmt)
else:
cmd.logger.critical(f'Command argument not supported (available options: {", ".join(f"`{_a}`" for _a in self.OPTIONS.keys())}).')
return


built_in_commands = {
'?': HelpCommand(),
'r': ReadFileCommand(),
'format': SwitchFormatCommand(),
'autocomplete': ToggleAutocompleteCommand(),
'autocapitalize': ToggleAutoCapitalizeCommand(),
'verbose': ToggleVerboseCommand(),
'check': CheckCommand(),
'pager': SetPager(),
"?": HelpCommand(),
"r": ReadFileCommand(),
"format": SwitchFormatCommand(),
"autocomplete": ToggleAutocompleteCommand(),
"autocapitalize": ToggleAutoCapitalizeCommand(),
"verbose": ToggleVerboseCommand(),
"check": CheckCommand(),
"pager": SetPager(),
"shards": ShardsCommand(),
}
52 changes: 52 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
ClusterCheckCommand,
NodeCheckCommand,
ReadFileCommand,
ShardsCommand,
ToggleAutoCapitalizeCommand,
ToggleAutocompleteCommand,
ToggleVerboseCommand,
Expand Down Expand Up @@ -267,6 +268,57 @@ def test_check_command_with_node_check(self, cmd):
cmd.logger.info.assert_called_with('NODE CHECK OK')


class ShardsCommandTest(TestCase):
@patch('crate.crash.command.CrateShell')
def test_shards_command(self,cmd):
rows = [
['table1','1','10','1024','0','0','100.0'],
['table2','2','20','2048','1','1024','50.0'],
['table3','3','30','3072','2','2048','33.3'],
]
cols = [('table_name', ), (' total_shards', ), (' total_num_docs', ), (' total_sum_shard_size', ), (' relocating_shards', ), (' relocating_size', ), (' relocated_percent')]
cmd._exec.return_value = True
cmd.cursor.fetchall.return_value = rows
cmd.cursor.description = cols

ShardsCommand()(cmd)
cmd.pprint.assert_called_with(rows, [c[0] for c in cols])

@patch('crate.crash.command.CrateShell')
def test_shards_command_state(self,cmd):
rows = [
['RELOCATING','2','33334465','9.307963063940406'],
['STARTED','1010','166665535','26.309150873683393'],
]
cols = [('routing_state', ), ('shard_count', ), ('num_docs', ), ('size_gb', )]
cmd._exec.return_value = True
cmd.cursor.fetchall.return_value = rows
cmd.cursor.description = cols

ShardsCommand()(cmd,"state")
cmd.pprint.assert_called_with(rows, [c[0] for c in cols])

@patch('crate.crash.command.CrateShell')
def test_shards_command_relocating_none(self,cmd):
cmd._exec.return_value = True
cmd.cursor.fetchall.return_value = []
ShardsCommand()(cmd,"relocating")
cmd.logger.info.assert_not_called()

@patch('crate.crash.command.CrateShell')
def test_shards_command_relocating(self,cmd):
rows = [
['table1','node02','1','DONE','4997198327','RELOCATING','STARTED','TRUE','d63zx99jSfWjr5wUykEXWQ','4880076.4912109375',''],
['table1','node01','2','DONE','4997150911','RELOCATING','STARTED','TRUE','d63zx99jSfWjr5wUykEXWQ','4880030.1865234375',''],
]
cols = [('table_name',),('node[\'name\']',),('id',),('recovery[\'stage\']',),('size',),('routing_state',),('state',),('primary',),('relocating_node',),('size_kb',),('partition_ident',)]
cmd._exec.return_value = True
cmd.cursor.fetchall.return_value = rows
cmd.cursor.description = cols

ShardsCommand()(cmd,"state")
cmd.pprint.assert_called_with(rows, [c[0] for c in cols])

@patch('crate.client.connection.Cursor', fake_cursor())
class CommentsTest(TestCase):

Expand Down
113 changes: 113 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,12 @@ def test_help_command(self):
'\\pager set an external pager. Use without argument to reset to internal paging',
'\\q quit crash',
'\\r read and execute statements from a file',
'\\shards shows progress of shards relocation (optional arguments: `state` and `relocating`)',
'\\sysinfo print system and cluster info',
'\\verbose toggle verbose mode',
])


help_ = command.commands['?']
self.assertTrue(isinstance(help_, Command))
self.assertEqual(expected, help_(command))
Expand Down Expand Up @@ -870,4 +872,115 @@ def test_connect_info_not_available(self, is_conn_available):
self.assertEqual(crash.connect_info.schema, None)


class ShardsCommandEmptyDBTest(TestCase):
def setUp(self):
node.reset()

def test_shards_command(self):
Comment thread
mannreis marked this conversation as resolved.
Outdated
expected = '\n'.join([
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+',
'| table_name | total_shards | total_num_docs | total_sum_shard_size | relocating_shards | relocating_size | relocated_percent |',
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+',
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+\n',
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd)
self.assertEqual(None, text)
self.assertEqual(expected, output.getvalue())

def test_shards_command_state(self):
expected = '\n'.join([
'+---------------+-------------+----------+---------+',
'| routing_state | shard_count | num_docs | size_gb |',
'+---------------+-------------+----------+---------+',
'+---------------+-------------+----------+---------+\n',
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd, 'state')
self.assertEqual(None, text)
self.assertEqual(expected, output.getvalue())

def test_shards_command_relocating(self):
expected = '\n'.join([
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+',
"| table_name | node['name'] | id | recovery['stage'] | size | routing_state | state | primary | relocating_node | size_kb | partition_ident |",
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+',
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+\n',
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd, 'relocating')
self.assertEqual(None, text)
self.assertEqual(expected, output.getvalue())

def test_shards_command_wrong_argument(self):
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
cmd.logger = ColorPrinter(False, stream=output)
text = shards_(cmd, 'arg1', 'arg2')
self.assertEqual(None, text)
self.assertEqual('Command argument not supported (available options: `state`, `relocating`).\n', output.getvalue())



class ShardsCommandWithContentTest(TestCase):
def tearDown(self):
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
cmd.process('DROP TABLE IF EXISTS test_table;')

def setUp(self):
node.reset()
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
cmd.process('CREATE TABLE test_table (id INTEGER PRIMARY KEY, data STRING ) CLUSTERED INTO 10 SHARDS WITH (number_of_replicas = 0);\n')

def test_shards_command(self):
expected = '\n'.join([
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+',
'| table_name | total_shards | total_num_docs | total_sum_shard_size | relocating_shards | relocating_size | relocated_percent |',
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+',
'| test_table | 10 | 0 | dummy | 0 | 0 | 100.0 |',
'+------------+--------------+----------------+----------------------+-------------------+-----------------+-------------------+\n',
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd)
self.assertEqual(None, text)
self.assertEqual(len(expected.splitlines()), len(output.getvalue().splitlines()))

def test_shards_command_state(self):
expected = '\n'.join([
'+---------------+-------------+----------+-----------------------+',
'| routing_state | shard_count | num_docs | size_gb |',
'+---------------+-------------+----------+-----------------------+',
'| STARTED | 10 | 0 | 7.40AAAAAAAAAAAAAA-07 |',
'+---------------+-------------+----------+-----------------------+\n'
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd, 'state')
self.assertEqual(None, text)
self.assertEqual(len(expected.splitlines()), len(output.getvalue().splitlines()))

def test_shards_command_relocating(self):
expected = '\n'.join([
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+',
"| table_name | node['name'] | id | recovery['stage'] | size | routing_state | state | primary | relocating_node | size_kb | partition_ident |",
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+',
'+------------+--------------+----+-------------------+------+---------------+-------+---------+-----------------+---------+-----------------+\n',
])
with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd:
shards_ = cmd.commands['shards']
with patch('sys.stdout', new_callable=StringIO) as output:
text = shards_(cmd, 'relocating')
self.assertEqual(None, text)
self.assertEqual(expected, output.getvalue())

setup_logging(level=logging.INFO)
Loading