|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import hashlib |
| 4 | +import hmac |
| 5 | +from functools import lru_cache |
| 6 | +from pathlib import Path |
| 7 | +from typing import BinaryIO |
| 8 | + |
| 9 | +from dissect.util.stream import AlignedStream |
| 10 | + |
| 11 | +from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError |
| 12 | +from dissect.database.sqlite3.exception import InvalidDatabase |
| 13 | +from dissect.database.sqlite3.sqlite3 import SQLITE3_HEADER_MAGIC, SQLite3 |
| 14 | + |
| 15 | +try: |
| 16 | + from Crypto.Cipher import AES |
| 17 | + |
| 18 | + HAS_CRYPTO = True |
| 19 | + |
| 20 | +except ImportError: |
| 21 | + HAS_CRYPTO = False |
| 22 | + |
| 23 | + |
| 24 | +class SQLCipher(SQLite3): |
| 25 | + """SQLCipher Community Edition implementation. |
| 26 | +
|
| 27 | + Instantiate with a subclass from :class:`SQLCipher4`, :class:`SQLCipher3`, :class:`SQLCipher2` |
| 28 | + or :class:`SQLCipher1`. |
| 29 | +
|
| 30 | + Decrypts a SQLCipher database from the given path or file-like oject. |
| 31 | +
|
| 32 | + Example usage: |
| 33 | + >>> from dissect.database.sqlite3.encryption import SQLCipher4 |
| 34 | + >>> db = SQLCipher4(Path("file.db"), "passphrase") |
| 35 | + >>> row = db.table("MyTable").row(0) |
| 36 | +
|
| 37 | + Args: |
| 38 | + fh (Path | BinaryIO): The path or file-like object to open. |
| 39 | + passphrase (str | bytes): String or bytes passphrase. |
| 40 | + salt (bytes): Optionally provide the 16-byte salt directly. |
| 41 | + plaintext_header_size (int): Size of plaintext header to use. |
| 42 | + page_size (int): Override size of each page. |
| 43 | + kdf_iter (int): Override amount of KDF iterations. |
| 44 | + kdf_algo (str | hashlib._Hash): Override KDF digest alrorithm. |
| 45 | + hmac_algo (str | hashlib._Hash): Override HMAC digest algorithm. |
| 46 | + no_kdf (bool): Disable KDF from passphrase, use as raw key. |
| 47 | + verify_hmac (bool): Optionally verify digest of every page. |
| 48 | +
|
| 49 | + Raises: |
| 50 | + SQLCipherError: If decryption failed using the provided arguments. |
| 51 | +
|
| 52 | + References: |
| 53 | + - https://www.zetetic.net/sqlcipher/design/ |
| 54 | + - https://github.com/sqlcipher/sqlcipher |
| 55 | + """ |
| 56 | + |
| 57 | + DEFAULT_PAGE_SIZE: int |
| 58 | + DEFAULT_KDF_ITER: int |
| 59 | + DEFAULT_KDF_ALGO: str |
| 60 | + DEFAULT_HMAC_ALGO: str | None |
| 61 | + |
| 62 | + def __init__( |
| 63 | + self, |
| 64 | + fh: Path | BinaryIO, |
| 65 | + passphrase: str | bytes, |
| 66 | + *, |
| 67 | + salt: bytes | None = None, |
| 68 | + plaintext_header_size: int | None = None, |
| 69 | + page_size: int | None = None, |
| 70 | + kdf_iter: int | None = None, |
| 71 | + kdf_algo: str | None = None, |
| 72 | + hmac_algo: str | None = None, |
| 73 | + no_kdf: bool = False, |
| 74 | + verify_hmac: bool = False, |
| 75 | + ): |
| 76 | + if not HAS_CRYPTO: |
| 77 | + raise RuntimeError("Missing dependency pycryptodome") |
| 78 | + |
| 79 | + if isinstance(fh, Path): |
| 80 | + cipher_fh = fh.open("rb") |
| 81 | + cipher_path = fh |
| 82 | + else: |
| 83 | + cipher_fh = fh |
| 84 | + cipher_path = None |
| 85 | + |
| 86 | + self.cipher_fh = cipher_fh |
| 87 | + self.cipher_path = cipher_path |
| 88 | + self.cipher_page_size = page_size or self.DEFAULT_PAGE_SIZE |
| 89 | + self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER |
| 90 | + self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO |
| 91 | + self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO |
| 92 | + self.verify_hmac = verify_hmac |
| 93 | + |
| 94 | + if not hasattr(self.cipher_fh, "read"): |
| 95 | + raise ValueError("Provided file handle cannot be read from") |
| 96 | + |
| 97 | + if isinstance(passphrase, str): |
| 98 | + passphrase = passphrase.encode() |
| 99 | + |
| 100 | + if not passphrase: |
| 101 | + raise SQLCipherError("No passphrase provided") |
| 102 | + |
| 103 | + if isinstance(self.hmac_algo, str): |
| 104 | + self.hmac_algo = hashlib.new(self.hmac_algo) |
| 105 | + |
| 106 | + if isinstance(self.kdf_algo, str): |
| 107 | + self.kdf_algo = hashlib.new(self.kdf_algo) |
| 108 | + |
| 109 | + # Part of the header can be plaintext. We can infer that or it can be passed upon initialization. |
| 110 | + # https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_plaintext_header_size |
| 111 | + if plaintext_header_size: |
| 112 | + self.plaintext_header_size = plaintext_header_size |
| 113 | + |
| 114 | + # The default and recommended plaintext header size is 32 bytes. |
| 115 | + elif (header_or_salt := self.cipher_fh.read(16)) == SQLITE3_HEADER_MAGIC: |
| 116 | + self.plaintext_header_size = 32 |
| 117 | + else: |
| 118 | + self.plaintext_header_size = None |
| 119 | + |
| 120 | + if self.plaintext_header_size and not salt: |
| 121 | + raise SQLCipherError("Plaintext header has no salt, please provide salt manually") |
| 122 | + |
| 123 | + self.salt = salt or header_or_salt |
| 124 | + self.passphrase = passphrase |
| 125 | + |
| 126 | + if no_kdf: |
| 127 | + self.key = self.passphrase |
| 128 | + else: |
| 129 | + self.key = derive_key( |
| 130 | + self.passphrase, self.salt, self.kdf_iter, self.kdf_algo.name if self.kdf_algo else None |
| 131 | + ) |
| 132 | + |
| 133 | + # The hmac key is derived using the raw or derived database key with it's own salt and two kdf iterations. |
| 134 | + self.hmac_salt = bytes(i ^ 0x3A for i in self.salt) |
| 135 | + self.hmac_key = derive_key(self.key, self.hmac_salt, 2, self.hmac_algo.name if self.hmac_algo else None) |
| 136 | + |
| 137 | + # Initialize the decrypted SQLite3 stream as a file-like object and see if that works. |
| 138 | + try: |
| 139 | + super().__init__(self.stream(), wal=None, checkpoint=None) |
| 140 | + except (InvalidDatabase, SQLCipherError) as e: |
| 141 | + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") from e |
| 142 | + |
| 143 | + # Sanity check to prevent further issues down the line. |
| 144 | + if self.header.page_size != self.cipher_page_size or self.header.schema_format_number not in (1, 2, 3, 4): |
| 145 | + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") |
| 146 | + |
| 147 | + def __repr__(self) -> str: |
| 148 | + return ( |
| 149 | + f"<{self.__class__.__name__} " |
| 150 | + f"fh={self.cipher_path or self.cipher_fh} " |
| 151 | + f"wal={self.wal} " |
| 152 | + f"checkpoint={bool(self.checkpoint)} " |
| 153 | + f"pages={self.header.page_count}>" |
| 154 | + ) |
| 155 | + |
| 156 | + def close(self) -> None: |
| 157 | + """Close the database.""" |
| 158 | + super().close() |
| 159 | + # Only close DB handle if we opened it using a path |
| 160 | + if self.cipher_path is not None: |
| 161 | + self.cipher_fh.close() |
| 162 | + |
| 163 | + def stream(self) -> SQLCipherStream: |
| 164 | + """Create a transparent decryption stream.""" |
| 165 | + return SQLCipherStream(self) |
| 166 | + |
| 167 | + |
| 168 | +class SQLCipherStream(AlignedStream): |
| 169 | + """Implements a transparent decryption stream for SQLCipher databases.""" |
| 170 | + |
| 171 | + def __init__(self, sqlcipher: SQLCipher): |
| 172 | + super().__init__(None, sqlcipher.cipher_page_size) |
| 173 | + |
| 174 | + self.fh = sqlcipher.cipher_fh |
| 175 | + self.sqlcipher = sqlcipher |
| 176 | + |
| 177 | + self._read_page = lru_cache(4096)(self._read_page) |
| 178 | + |
| 179 | + def _read(self, offset: int, length: int) -> bytes: |
| 180 | + """Calculates which pages to read from based on the given offset and length. Returns decrypted bytes.""" |
| 181 | + |
| 182 | + start_page = offset // self.align |
| 183 | + num_pages = length // self.align |
| 184 | + return b"".join( |
| 185 | + self._read_page(num + 1, self.sqlcipher.verify_hmac) for num in range(start_page, start_page + num_pages) |
| 186 | + ) |
| 187 | + |
| 188 | + def _read_page(self, page_num: int, verify_hmac: bool = False) -> bytes: |
| 189 | + """Decrypt and read from the given SQLCipher page number. |
| 190 | +
|
| 191 | + References: |
| 192 | + - https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c |
| 193 | + """ |
| 194 | + |
| 195 | + if page_num < 1: |
| 196 | + raise ValueError("The first page number is 1") |
| 197 | + |
| 198 | + fh = self.sqlcipher.cipher_fh |
| 199 | + page_size = self.sqlcipher.cipher_page_size |
| 200 | + |
| 201 | + # Calculate the absolute offset in the SQLCipher file handle by multiplying the page number with |
| 202 | + # the SQLCipher page size. |
| 203 | + offset = (page_num - 1) * page_size |
| 204 | + |
| 205 | + # Calculate size of the page iv (always 16 bytes) plus the hmac digest size. |
| 206 | + hmac_algo = self.sqlcipher.hmac_algo |
| 207 | + digest_size = hmac_algo.digest_size if hmac_algo else 0 |
| 208 | + align = 16 + digest_size |
| 209 | + |
| 210 | + # Calculate the size of the encrypted data by substracting the iv and hmac size from the page size. |
| 211 | + # The sum of the iv and hmac size needs to be adjusted to 16 byte blocks. |
| 212 | + if align % 16 != 0: |
| 213 | + align = (align + 15) & ~15 |
| 214 | + enc_size = page_size - align |
| 215 | + |
| 216 | + # By default, the first page 'contains' the database salt (in place of SQLITE_HEAER_MAGIC) so we substract those |
| 217 | + # first 16 bytes from the page size and update the ciphertext offset and size accordingly. |
| 218 | + header_offset = 0 |
| 219 | + header = b"" |
| 220 | + if page_num == 1: |
| 221 | + header_offset = self.sqlcipher.plaintext_header_size or 16 |
| 222 | + enc_size -= header_offset |
| 223 | + offset += header_offset |
| 224 | + |
| 225 | + # Prepare the plaintext header of the SQLite3 database if this is the first page, or read the plaintext |
| 226 | + # header according to the plaintext_header_size variable. |
| 227 | + if header_offset == 16: |
| 228 | + header = SQLITE3_HEADER_MAGIC |
| 229 | + elif header_offset: |
| 230 | + fh.seek(0) |
| 231 | + header = fh.read(header_offset) |
| 232 | + |
| 233 | + # The last part of the page contains the iv and optionally a hmac digest. |
| 234 | + fh.seek(offset + enc_size) |
| 235 | + iv = fh.read(16) |
| 236 | + page_hmac = fh.read(digest_size) if digest_size else None |
| 237 | + |
| 238 | + fh.seek(offset) |
| 239 | + ciphertext = fh.read(enc_size) |
| 240 | + |
| 241 | + if len(iv) != 16 or not ciphertext: |
| 242 | + raise EOFError |
| 243 | + |
| 244 | + # Optionally verify the hmac signature with the page's ciphertext. Assumes default CIPHER_FLAG_LE_PGNO. |
| 245 | + # https://github.com/sqlcipher/sqlcipher-tools/blob/master/verify.c |
| 246 | + # https://github.com/sqlcipher/sqlcipher/blob/master/src/sqlcipher.c @ sqlcipher_page_hmac |
| 247 | + if verify_hmac: |
| 248 | + if not hmac_algo: |
| 249 | + raise ValueError("verify_hmac is set to True but no HMAC algorithm is selected") |
| 250 | + |
| 251 | + hmac_msg = ciphertext + iv + page_num.to_bytes(4, "little") |
| 252 | + calc_hmac = hmac.digest(self.sqlcipher.hmac_key, hmac_msg, hmac_algo.name) |
| 253 | + |
| 254 | + if calc_hmac != page_hmac: |
| 255 | + raise SQLCipherError( |
| 256 | + f"HMAC digest mismatch for page {page_num} (expected {page_hmac.hex()}, got {calc_hmac.hex()})" |
| 257 | + ) |
| 258 | + |
| 259 | + # Decrypt the ciphertext using AES CBC and append null bytes so the plaintext aligns with the page size. |
| 260 | + cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, iv) |
| 261 | + plaintext = cipher.decrypt(ciphertext) + (align * b"\x00") |
| 262 | + |
| 263 | + # Return the plaintext prepended by the optional plaintext header. |
| 264 | + return header + plaintext |
| 265 | + |
| 266 | + |
| 267 | +class SQLCipher4(SQLCipher): |
| 268 | + DEFAULT_PAGE_SIZE = 4096 |
| 269 | + DEFAULT_KDF_ITER = 256_000 |
| 270 | + DEFAULT_KDF_ALGO = "SHA512" |
| 271 | + DEFAULT_HMAC_ALGO = "SHA512" |
| 272 | + |
| 273 | + |
| 274 | +class SQLCipher3(SQLCipher): |
| 275 | + DEFAULT_PAGE_SIZE = 1024 |
| 276 | + DEFAULT_KDF_ITER = 64_000 |
| 277 | + DEFAULT_KDF_ALGO = "SHA1" |
| 278 | + DEFAULT_HMAC_ALGO = "SHA1" |
| 279 | + |
| 280 | + |
| 281 | +class SQLCipher2(SQLCipher): |
| 282 | + DEFAULT_PAGE_SIZE = 1024 |
| 283 | + DEFAULT_KDF_ITER = 4000 |
| 284 | + DEFAULT_KDF_ALGO = "SHA1" |
| 285 | + DEFAULT_HMAC_ALGO = "SHA1" |
| 286 | + |
| 287 | + |
| 288 | +class SQLCipher1(SQLCipher): |
| 289 | + DEFAULT_PAGE_SIZE = 1024 |
| 290 | + DEFAULT_KDF_ITER = 4000 |
| 291 | + DEFAULT_KDF_ALGO = "SHA1" |
| 292 | + DEFAULT_HMAC_ALGO = None |
| 293 | + |
| 294 | + |
| 295 | +def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: str | None) -> bytes: |
| 296 | + """Derive the database key as SQLCipher would using PBKDF2.""" |
| 297 | + |
| 298 | + if not kdf_iter or not kdf_algo: |
| 299 | + return passphrase |
| 300 | + |
| 301 | + return hashlib.pbkdf2_hmac(kdf_algo, passphrase, salt, kdf_iter, 32) |
0 commit comments