import hashing import entry class FileFormatError(Exception): def __init__(self, string): self.string = string self.line = None def __str__(self): if self.line == None: return self.string else: return 'Line %i: %s' % (self.line, self.string) class VersionMismatch(Exception): def __init__(self, string): self.string = string self.line = None def __str__(self): if self.line == None: return self.string else: return 'Line %i: %s' % (self.line, self.string) def parse_header(header): """parse_header(bytes) → str Throw an error if the header isn't good and return the file comment (if any) if it is""" assert type(header) == bytes # Check that it ends in a newline if len(header) == 0 or header[-1] != 0x0a: raise FileFormatError('No newline after header') # Split it into fields and make sure we have at least the magic and # the version fields = header[:-1].split(b' ', 3) if len(fields) < 2: raise FileFormatError('Too few fields in the header, expected at least magic and version') # Check the magic if fields[0] != b'SSHWOT': raise FileFormatError('Invalid magic') # See if we have a comment if len(fields) == 3: # It says we have if len(fields[2]) == 0: # No, we don't, but we do have a space telling we # have. The header is malformed raise FileFormatError('Missing comment or spurious space in the header') else: # Yes, we do. Extract it try: file_comment = fields[2].decode('utf-8') except UnicodeDecodeError: raise FileFormatError('Comment is not valid utf-8') else: file_comment = '' return file_comment def parse_entry(line): """parse_entry(bytes) → Entry""" assert type(line) == bytes def decode_b64_field(b64): try: return hashing.base64dec(b64) except (ValueError, base64.binascii.Error) as err: raise FileFormatError('Malformed base64 string: %s' % b64.decode('utf-8')) from err # Check that it ends in a newline if len(line) == 0 or line[-1] != 0x0a: raise FileFormatError('No newline after entry') # Split the line into fields and make sure we have at least the # salt, the hashed host, and the fingerprint fields = line[:-1].split(b' ', 3) if len(fields) < 3: raise FileFormatError('Too few fields in the entry, expected in the very least salt, hashed host, and fingerprint') salt = decode_b64_field(fields[0]) hashed_host = decode_b64_field(fields[1]) fingerprint = decode_b64_field(fields[2]) # See if we have a comment if len(fields) == 4: # It says we have if len(fields[3]) == 0: # No, we don't, but we do have a space telling we # have. The header is malformed raise FileFormatError('Missing comment or spurious space in the entry') else: # Yes, we do. Extract it try: comment = fields[3].decode('utf-8') except UnicodeDecodeError: raise FileFormatError('Comment is not valid utf-8') else: comment = '' return entry.Entry(salt, hashed_host, fingerprint, comment) def read(f): """read(file(rb)) → ([Entry]: entries, str: file_comment)""" lines = [line for line in f] if len(lines) == 0: raise FileFormatError('Missing header') try: file_comment = parse_header(lines[0]) except (FileFormatError, VersionMismatch) as err: err.line = 1 raise err entries = [] # Since line numbers are 1-indexed while lists in python are # 0-indexed and we handle the first one separately, first one in the # list is line 2 for linenum_minus_2, line in enumerate(lines[1:]): try: entries.append(parse_entry(line)) except FileFormatError as err: err.line = linenum_minus_2 + 2 raise err return entries, file_comment