Skip to content

Commit

Permalink
Added support for NVMe tests
Browse files Browse the repository at this point in the history
Fixes issue NVMe Windows self-test seems a bit off #64
  • Loading branch information
ralequi committed Mar 27, 2023
1 parent aebb2b5 commit b04e4d7
Show file tree
Hide file tree
Showing 25 changed files with 1,564 additions and 122 deletions.
34 changes: 21 additions & 13 deletions pySMART/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,11 @@ class members, including the SMART attribute table and self-test log.

if interface == 'nvme':
self.if_attributes = NvmeAttributes(iter(_stdout))

# Get Tests
for test in self.if_attributes.tests:
self.tests.append(TestEntry('nvme', test.num, test.description, test.status, test.powerOnHours,
test.failingLBA, nsid=test.nsid, sct=test.sct, code=test.code, remain=100-test.progress))
else:
self.if_attributes = None

Expand All @@ -974,10 +979,6 @@ class members, including the SMART attribute table and self-test log.
if parse_ascq:
message += ' ' + line.lstrip().rstrip()
if parse_self_tests:
num = line[0:3]
if '#' not in num:
continue

# Detect Test Format

## SCSI/SAS FORMAT ##
Expand All @@ -987,7 +988,15 @@ class members, including the SMART attribute table and self-test log.
# Description number (hours)
# # 1 Background short Completed - 33124 - [- - -]
format_scsi = re.compile(
r'^[#\s]*([^\s]+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s+\[([^\s]+)\s+([^\s]+)\s+([^\s]+)\]$').match(line)
r'^[#\s]*(\d+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s+\[([^\s]+)\s+([^\s]+)\s+([^\s]+)\]$').match(line)

## ATA FORMAT ##
# Example smartctl output:
# SMART Self-test log structure revision number 1
# Num Test_Description Status Remaining LifeTime(hours) LBA_of_first_error
# # 1 Extended offline Completed without error 00% 46660 -
format_ata = re.compile(
r'^[#\s]*(\d+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{1,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])$').match(line)

if format_scsi is not None:
format = 'scsi'
Expand All @@ -1013,15 +1022,10 @@ class members, including the SMART attribute table and self-test log.
asc=asc,
ascq=ascq
))
else:
elif format_ata is not None:
## ATA FORMAT ##
# Example smartctl output:
# SMART Self-test log structure revision number 1
# Num Test_Description Status Remaining LifeTime(hours) LBA_of_first_error
# # 1 Extended offline Completed without error 00% 46660 -
format = 'ata'
parsed = re.compile(
r'^[#\s]*([^\s]+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{1,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])$').match(line).groups()
parsed = format_ata.groups()
num = parsed[0]
test_type = parsed[1]
status = parsed[2]
Expand All @@ -1038,6 +1042,9 @@ class members, including the SMART attribute table and self-test log.
TestEntry(format, num, test_type, status,
hours, lba, remain=remain)
)
else:
pass

# Basic device information parsing
if any_in(line, 'Device Model', 'Product', 'Model Number'):
self.model = line.split(':')[1].lstrip().rstrip()
Expand Down Expand Up @@ -1182,8 +1189,9 @@ class members, including the SMART attribute table and self-test log.
pass
parse_running_test = False

if all_in(line, 'Description', '(hours)'):
if "Self-test log" in line:
parse_self_tests = True # Set flag to capture test entries
continue

#######################################
# SCSI only #
Expand Down
238 changes: 188 additions & 50 deletions pySMART/interface/nvme.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
import re
import humanfriendly
from typing import Iterator, Union, List
from typing import Optional, Iterator, Union, List


class NvmeStatus(Enum):
Expand Down Expand Up @@ -168,17 +168,17 @@ class NvmeError(object):
cs : The command specific
"""

def __init__(self, id: int = None, count: int = None, sqid: int = None, cmdid: int = None, status: int = None, peloc: int = None, lba: int = None, nsid: int = None, vs: int = None):
self.id: int = id
self.count: int = count
self.sqid: int = sqid
self.cmdid: int = cmdid
def __init__(self, num: int, errCount: int, sqId: int, cmdId: int, status: int, peLoc: int, lba: Optional[int] = None, nsid: Optional[int] = None, vs: Optional[int] = None):
self.num: int = num
self.errCount: int = errCount
self.sqId: int = sqId
self.cmdId: int = cmdId
self.status: int = status
self.peloc: int = peloc
self.lba: Union[int, None] = lba
self.nsid: Union[int, None] = nsid
self.vs: Union[int, None] = vs
self.cs: Union[int, None] = None
self.peLoc: int = peLoc
self.lba: Optional[int] = lba
self.nsid: Optional[int] = nsid
self.vs: Optional[int] = vs
self.cs: Optional[int] = None

@property
def status_str(self) -> str:
Expand Down Expand Up @@ -409,7 +409,66 @@ def __str__(self):
return self.__repr__()

def __repr__(self):
return f'{self.status}: ({self.status_str()})'
return f'{self.status}: ({self.status_str})'

def __getstate__(self, all_info=True):
"""
Allows us to send a pySMART diagnostics object over a serializable
medium which uses json (or the likes of json) payloads
"""
return vars(self)

def __setstate__(self, state):
self.__dict__.update(state)


class NvmeSelfTest(object):
"""This class represents a test entry of a NVMe device
Attributes:
num (int): The test number
description (str): The test description
status (str): The test status
powerOnHours (int): The power on hours
failingLBA (Optional[int]): The failing LBA
nsid (Optional[int]): The namespace ID
sct (Optional[str]): The SCT
code (Optional[str]): The code
progress (int): The progress of the test. Defaults to 100%
"""

def __init__(self, num: int, description: str, status: str, powerOnHours: int, failingLBA: Optional[int] = None, nsid: Optional[int] = None, sct: Optional[str] = None, code: Optional[str] = None, progress: int = 100):

self.num: int = num
self.description: str = description
self.status: str = status
self.powerOnHours: int = powerOnHours
self.failingLBA: Optional[int] = failingLBA
self.nsid: Optional[int] = nsid
self.sct: Optional[str] = sct
self.code: Optional[str] = code
self.progress: int = progress

def __str__(self):
return self.__repr__()

def __repr__(self):
# Example smartctl output
# Self-test Log (NVMe Log 0x06)
# Self-test status: Extended self-test in progress (28% completed)
# Num Test_Description Status Power_on_Hours Failing_LBA NSID SCT Code
# 0 Extended Completed without error 3441 - - - -
return ("{0:>2} {1:18}{2:29}{3:16}{4:13}{5:5}{6:4}{7:4}".format(
self.num,
self.description,
self.status,
self.powerOnHours,
self.failingLBA,
self.nsid,
self.sct,
self.code
))

def __getstate__(self, all_info=True):
"""
Expand Down Expand Up @@ -449,35 +508,36 @@ class NvmeAttributes(object):
errors : List of errors
"""

def __init__(self, data: Iterator[str] = None):
def __init__(self, data: Optional[Iterator[str]] = None):
"""Initializes the attributes
Args:
data (Iterator[str], optional): Iterator of the lines of the output of the command nvme smart-log. Defaults to None.
"""

self.critialWarning: int = None
self.temperature: int = None
self.availableSpare: int = None
self.availableSpareThreshold: int = None
self.percentageUsed: int = None
self.dataUnitsRead: int = None
self.bytesRead: int = None
self.dataUnitsWritten: int = None
self.bytesWritten: int = None
self.hostReadCommands: int = None
self.hostWriteCommands: int = None
self.controllerBusyTime: int = None
self.powerCycles: int = None
self.powerOnHours: int = None
self.unsafeShutdowns: int = None
self.integrityErrors: int = None
self.errorEntries: int = None
self.warningTemperatureTime: int = None
self.criticalTemperatureTime: int = None
self.critialWarning: Optional[int] = None
self.temperature: Optional[int] = None
self.availableSpare: Optional[int] = None
self.availableSpareThreshold: Optional[int] = None
self.percentageUsed: Optional[int] = None
self.dataUnitsRead: Optional[int] = None
self.bytesRead: Optional[int] = None
self.dataUnitsWritten: Optional[int] = None
self.bytesWritten: Optional[int] = None
self.hostReadCommands: Optional[int] = None
self.hostWriteCommands: Optional[int] = None
self.controllerBusyTime: Optional[int] = None
self.powerCycles: Optional[int] = None
self.powerOnHours: Optional[int] = None
self.unsafeShutdowns: Optional[int] = None
self.integrityErrors: Optional[int] = None
self.errorEntries: Optional[int] = None
self.warningTemperatureTime: Optional[int] = None
self.criticalTemperatureTime: Optional[int] = None

self.errors: List[NvmeError] = []
self.tests: List[NvmeSelfTest] = []

if data is not None:
self.parse(data)
Expand Down Expand Up @@ -591,32 +651,107 @@ def parse(self, data: Iterator[str]) -> None:
r'^\s*(?P<num>\d+)\s+(?P<errCount>\d+)\s+(?P<sqId>\d+)\s+(?P<cmdId>\w+)\s+(?P<status>\w+)\s+(?P<peLoc>\w+)\s+(?P<lba>\S+)\s+(?P<nsid>\S+)\s+(?P<vs>\S+)\s*$', line)

if match:
error = NvmeError()

error.num = int(match.group('num'))
error.errCount = int(match.group('errCount'))
error.sqId = int(match.group('sqId'))
error.cmdId = int(match.group('cmdId'), 16)
error.status = int(match.group('status'), 16)
error.peLoc = int(match.group('peLoc'), 16)

if match.group('lba') == '-':
error.lba = None
else:
error = NvmeError(
num=int(match.group('num')),
errCount=int(match.group('errCount')),
sqId=int(match.group('sqId')),
cmdId=int(match.group('cmdId'), 16),
status=int(match.group('status'), 16),
peLoc=int(match.group('peLoc'), 16)
)

if match.group('lba') != '-':
error.lba = int(match.group('lba'), 16)

if match.group('nsid') == '-':
error.nsid = None
else:
if match.group('nsid') != '-':
error.nsid = int(match.group('nsid'))

if match.group('vs') == '-':
error.vs = None
else:
if match.group('vs') != '-':
error.vs = int(match.group('vs'), 16)

self.errors.append(error)

elif line.startswith('Self-test Log (NVMe Log 0x06)'):

## NVME FORMAT ##
# Example smartctl output
# Self-test Log (NVMe Log 0x06)
# Self-test status: Extended self-test in progress (28% completed)
# Num Test_Description Status Power_on_Hours Failing_LBA NSID SCT Code
# 0 Extended Completed without error 3441 - - - -
nvme_entry_regex = re.compile(
r'^[#\s]*(\d+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(\d+)\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*[^\s])\s{2,}(.*)$')

line = next(data)

# Check the current test
# Example of non running test: Self-test status: No self-test in progress
# Example of running test: Self-test status: Extended self-test in progress (4% completed)
currentTest = None
if line.startswith('Self-test status:'):
line = line[18:].strip()

# check
if line.startswith('No self-test in progress'):
pass
else:
# parse the test
match = re.match(
r'^(\w+) self-test in progress \((\d+)% completed\)$', line)
if match:
powerOnHours = self.powerOnHours
if powerOnHours is None:
powerOnHours = 0

currentTest = NvmeSelfTest(
num=-1,
description=match.group(1),
status='Running',
powerOnHours=powerOnHours,
progress=int(match.group(2))
)

# Parse tests
for line in data:
line = line.strip()

match = nvme_entry_regex.match(line)

if match:

num = int(match.group(1))
description = match.group(2)
status = match.group(3)
powerOnHours = int(match.group(4))

failingLBA = None
if match.group(5) != '-':
failingLBA = int(match.group(5))

nsid = None
if match.group(6) != '-':
nsid = int(match.group(6))

sct = match.group(7)
code = match.group(8)

test = NvmeSelfTest(
num=num,
description=description,
status=status,
powerOnHours=powerOnHours,
failingLBA=failingLBA,
nsid=nsid,
sct=sct,
code=code
)

self.tests.append(test)

if currentTest is not None:
currentTest.num = len(self.tests)
self.tests.append(currentTest)

def __getstate__(self, all_info=True):
"""
Allows us to send a pySMART diagnostics object over a serializable
Expand All @@ -627,5 +762,8 @@ def __getstate__(self, all_info=True):
if ret['errors'] is not None:
ret['errors'] = [vars(e) for e in ret['errors']]

if ret['tests'] is not None:
ret['tests'] = [vars(e) for e in ret['tests']]

def __setstate__(self, state):
self.__dict__.update(state)
Loading

0 comments on commit b04e4d7

Please sign in to comment.