import base64 import entry # TODO: Include file number in the error info class FileFormatError(Exception): pass class VersionMismatch(Exception): pass def parse_header(header): """parse_header(bytes) → str Throw an error if the header isn't good and return the file comment (if any) if it is""" assert type(header) == bytes magic = header[0:6] if magic != b'SSHWOT': raise FileFormatError('Invalid magic') # Version 0 is the current one version = header[6:7] if version == b'': raise FileFormatError('No newline after header') if version != b'0': raise VersionMismatch('Version %i not supported' % version[0]) # See if we have a comment if header[7:8] == b' ': # It says we have if header[8:9] == b'\n': # No, we don't, but we do have a space telling we # have. The header is malformed raise FileFormatError('Missing comment or spurious space in the header') else: # Yes, we do # Check it ends with a newline if header[-1] != 0x0a: raise FileFormatError('Missing newline at the end of the header') try: file_comment = header[8:-1].decode('utf-8') except UnicodeDecodeError: raise FileFormatError('Comment is not valid utf-8') return file_comment elif header[7:8] == b'\n': # No, we have newline return '' else: # No, we have something else raise FileFormatError("Expected a space or a newline but got '%s' instead" % header[7:].decode('utf-8')) def parse_entry(line): """parse_entry(bytes) → Entry""" assert type(line) == bytes def extract_b64_field(rest): """extract_b64_field(bytes) → (bytes: decoded_field, bytes:rest)""" field_b64 = rest[0:44] if len(field_b64) != 44: raise FileFormatError('Unexpected end of line') try: field = base64.b64decode(field_b64, validate = True) except (ValueError, base64.binascii.Error) as err: raise FileFormatError('Malformed base64 string: %s' % field_b64.decode('utf-8')) from err return field, rest[44:] salt, rest = extract_b64_field(line) hashed_host, rest = extract_b64_field(rest) fingerprint, rest = extract_b64_field(rest) # What do we have after that? if rest[0:1] == b' ': # A comment? if rest[1:2] == b'\n': # No, but it says we have. It's malformed raise FileFormatError('Missing comment or spurious space in the entry') else: # Yes. Make sure it ends in a newline if rest[-1] != 0x0a: raise FileFormatError('No newline after entry') try: comment = rest[1:-1].decode('utf-8') except UnicodeDecodeError: raise FileFormatError('Comment is not valid utf-8') elif rest[0:1] == b'\n': # A newline comment = '' else: # Something else raise FileFormatError('Expected a space or a newline but got "%s" instead' % rest.decode('utf-8')) return entry.Entry(salt, hashed_host, fingerprint, comment) def read(f): """read(file(rb)) → ([Entry]: entries, str: file_comment)""" lines = [line for line in f] if len(lines) == 0: raise FileFormatError('Missing header') file_comment = parse_header(lines[0]) entries = [] for line in lines[1:]: entries.append(parse_entry(line)) return entries, file_comment