개인적으로 지금 DB에 자료를 암호화해서 넣은 후 필요한 경우에만 암호화를 풀어서 자료를 열람하거나 쿼리의 조건으로 쓰는 작업을 해야하는데, 이것에 대해서는 Spoqa 기술 블로그에 이미 간단한 방법이 소개되어 있습니다. 저 글에서는 Flask-SQLAlchemy(로 추측되는 무언가), PyCrypto, MySQL등을 사용해서 저장값이 Float형인 경우만 고려해서 구현했는데, 저는 PostgreSQL을 써야했기에 저 글을 그대로 가져다 쓸 수 없었습니다. 그래서 가장 처음 한 것이 sqlalchemy.sql.expression.func의 실제 동작에 대한 분석입니다. 그리고 문자열이나 Blob형 자료를 다루기 위해서는 padding이 중요한데, padding을 어떻게 해야할지 알아보기 위해 PostgreSQL에서 쓰는 실제 padding algorithm을 소스를 분석하여 알아보았습니다. 이 글은 앞서 알아본 두 가지 정보를 활용하여 작성되었습니다.

처음엔 이 글에서 PostgreSQL만 다루려고 하였으나 만약의 사태에 대비하여 MySQL을 쓰는 경우에 대해서도 작성해두었습니다. 다만 여기에는 치명적인 한계가 있는데 MySQL은 5.6.17 이상에서만 CBC 모드를 지원합니다. 즉, 그 이하 버전이거나 MariaDB를 쓰는 경우에는 ECB모드를 사용해야합니다. (당연히 보안성이 그만큼 떨어집니다.)

이 글은 MySQL과 PostgreSQL 양쪽 모두를 가능한 선에서 최대한 지원하려고 하였으므로 소스코드가 복잡할 것이고, 아직 최적화를 하지 않은 코드가 몇 있어서1 실제 가져다 쓰려면 조금 수정이 필요할 것입니다.

먼저 SA 함수를 만드려면 class를 만들고 그 클래스의 부모 클래스로 sqlalchemy.sql.functions.FunctionElement를 지정해야합니다. 그리고 함수의 반환형을 지정해주어야 합니다. 나는 encryption한 결과는 LargeBinary(MySQL에선 BLOB, PostgreSQL에선 bytea)로 지정되도록 하고 싶었고, decryption한 결과는 바로 사용 가능하도록 String형으로 하고 싶었습니다.2 따라서 다음과 같이 만들었습니다.

from sqlalchemy.sql.functions import FunctionElement


class aes_encrypt(FunctionElement): #  noqa
    name = 'aes_encrypt'
    type = LargeBinary


class aes_decrypt(FunctionElement): #  noqa
    name = 'aes_decrypt'
    type = String

(여기서 # noqaflake8 검사를 통과하기 위한 것입니다.)

그 다음에 실제 구현을 만들어야합니다. 먼저 (간단한) MySQL 구현부터 만들어보았습니다. MySQL은 암호화에 aes_encrypt, 복호화에 aes_decrypt 함수를 사용합니다. 반환형은 모두 BINARY입니다. 저는 복호화한 결과물을 바로 문자열로써 쓰고 싶으므로 복호화한 결과물은 cast(... AS CHAR)도 해주어야 할 것입니다. 암호화에는 (당연히) iv(initial vector)가 필요하지만 ECB모드로 사용할 경우도 감안해서 iv를 안넣어도 잘 돌아가도록 만들었습니다.

from sqlalchemy.ext.compiler import compiles

@compiles(aes_encrypt, 'mysql')
def _aes_encrypt_mysql(element, compiler, **kw):
    try:
        data, key, iv = list(element.clauses)
        return "aes_encrypt({}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv)
        )
    except ValueError:
        data, key = list(element.clauses)
        return "aes_encrypt({}, {})".format(
            compiler.process(data),
            compiler.process(key)
        )


@compiles(aes_decrypt, 'mysql')
def _aes_decrypt_mysql(element, compiler, **kw):
    try:
        data, key, iv = list(element.clauses)
        return "cast(aes_decrypt({}, {}, {}) AS CHAR)".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv)
        )
    except ValueError:
        data, key = list(element.clauses)
        return "cast(aes_decrypt({}, {}) AS CHAR)".format(
            compiler.process(data),
            compiler.process(key)
        )

@compiles decorator는 첫 인자로 해당 함수를, 두번째 인자(optinal)로 dbms 종류를 받습니다. decorate된 함수는 element, compiler, **kw의 인자를 받아야합니다. list(element.clauses)를 하면 실제 함수를 실행했을때의 인자들을 받을 수 있습니다. 각 인자에 compiler.process함수를 적용해서 넣는 것을 알 수 있는데, 실제로 SA 내부적으로 인자의 종류에 따라 자료형을 변환하는 등의 작업을 하기 때문에 필요합니다.

이번엔 PostgreSQL의 구현입니다.

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import bindparam


def _pgsql_parse_args(args, compiler):
    args = list(args)
    args_len = len(args)
    if args_len == 4:
        data, key, iv, set_padding_none = args
    elif args_len == 3:
        data, key, iv = args
        set_padding_none = False
    else:
        data, key = args
        iv = None
        set_padding_none = False

    mode_str = 'aes-cbc/pad:{}'
    if set_padding_none:
        mode_str = mode_str.format('none')
    else:
        mode_str = mode_str.format('pkcs')

    mode = compiler.process(bindparam('mode', mode_str, type_=String))

    return data, key, iv, mode


@compiles(aes_encrypt, 'postgresql')
def _aes_encrypt_pgsql(element, compiler, **kw):
    data, key, iv, mode = _pgsql_parse_args(element.clauses, compiler)

    if iv is not None:
        return "encrypt_iv({}, {}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv),
            mode,
        )
    else:
        return "encrypt({}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            mode,
        )


@compiles(aes_decrypt, 'postgresql')
def _aes_decrypt_pgsql(element, compiler, **kw):
    data, key, iv, mode = _pgsql_parse_args(element.clauses, compiler)

    if iv is not None:
        return (
            "convert_from(decrypt_iv({}, {}, {}, {}), 'UTF8')"
        ).format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv),
            mode,
        )
    else:
        return (
            "convert_from(decrypt({}, {}, {}), 'UTF8')"
        ).format(
            compiler.process(data),
            compiler.process(key),
            mode,
        )

PostgreSQL은 AES 암호화에 encrypt, 복호화에 decrypt란 함수를 쓰는데, AES를 사용할 경우 마지막 파라메터로 명시적으로 'aes'라고 넣어줘야 합니다.3 마지막 파라메터는 'aes[-(mode)][/pad:(padding)] 형식인데, 나는 그냥 명시적으로 모두 기입하도록 하였습니다. 여기서 padding이 나오는데, MySQL은 알아서 처리해주는데, PostgreSQL은 명시적으로 padding을 쓰지 않을 수 있으므로 옵션으로 받을 수 있도록 처리해주었습니다. 여기서 한가지 의문을 가질 수 있는 부분은 CBC모드면서 iv를 받지 않아도 실행이 되냐는 점인데, 실행이 됩니다. PgCrypto가 알아서 내부적으로 iv를 관리한다고 합니다.

여기서 또 처음 보는 작업이 나오는데 바로 bindparam입니다. 본디 bindparam은 이렇게 쓰라고 있는 것입니다. (SA docs에서 인용했습니다.)

from sqlalchemy import bindparam

stmt = select([users_table]).\
            where(users_table.c.name == bindparam('username'))

result = connection.execute(stmt, username='wendy')

이렇게 하면 SQL이 다음과 같은 형태로 준비되어있다가 실행됩니다.

SELECT id, name FROM user WHERE name = :username

그런데 저는 이것을 조금 더 이용해먹기로 했습니다. 값과, 실제 유형까지 미리 주고서 compiler가 활용하도록 한 것이죠. 내가 만든 mode_str을 실제 SA에서 사용할 수 있는 자료형으로 만든 것입니다.

그런데 두 DBMS 모두 AES 암호화를 사용하기 위한 전제조건이 있습니다. MySQL은 mode를 지정해주는 SQL을 실행해줘야 활성화되고, PostgreSQL은 아예 extention을 설치해줘야 합니다. 이러한 작업은 DB 접속이 활성화되자마자 진행되어야 하므로 SA의 core event를 사용하기로 하였습니다.

from flask import current_app
from sqlalchemy import event


def install_specific_dialects_event(engine):
    aes_key = current_app.config['AES_KEY']
    aes_key_bit = len(aes_key)*8
    dialect_name = engine.dialect.name
    if dialect_name == 'mysql':
        @event.listens_for(engine, 'connect')
        def mysql_on_connect(conn, record):
            curr = conn.cursor()
            curr.execute(
                "SET block_encryption_mode = 'aes-{}-cbc';".format(
                    aes_key_bit
                )
            )
            curr.close()
    elif dialect_name == 'postgresql':
        @event.listens_for(engine, 'first_connect')
        def pgsql_on_first_connect(conn, record):
            curr = conn.cursor()
            try:
                curr.execute("create extension pgcrypto;")
            except Exception as e:
                if 'extension "pgcrypto" already exists' in str(e):
                    conn.rollback()
                else:
                    raise e
            curr.close()

여기서 aes_key를 구하는 부분이 있는데, MySQL을 위한 부분입니다. 왜냐하면 암호화에 사용할 KEY의 길이를 알려줘야 하기 때문입니다. 여기선 flask.current_app을 사용했는데 딱히 의무적으로 flask를 써야하는 것은 아니므로 필요에 따라 조정해주시면 됩니다. core event를 사용할때의 주의사항은 넘겨받는 conn 변수가 session.connection()으로 받는 것과 다른 종류라는 것입니다. 따라서 conn.cursor()도 SA에서 넘겨받는 cursor와 다른 동작을 합니다. 따라서 실제 에러가 났을 경우에 명시적으로 conn.rollback() 해주어야만 합니다. 그러지 않는다면 첫 SQL 실행이 실패할 것입니다. 그 뒤부터 실행하는건 또 잘 되므로 혼란스러울 수 있습니다.

여기서 만든 함수는 engine을 생성해줄때, 즉 create_engine 함수 아래에서 실행해주면 됩니다.

from sqlalchemy import create_engine


engine = create_engine(db_url)
install_specific_dialects_event(engine)

구현한 소스가 이제 잘 실행되는지 실험해보았습니다. 원활한 실행 진행을 위해 create_engine 함수에 echo=True 옵션을 주었습니다. (실행은 flask의 app_context 안에서 돌고 있습니다. Flask-Script에서 기본 제공하는 shell 명령어와 거의 동일한 환경입니다. 따라서 engine에 뭔가 설치하는 등의 과정은 생략했습니다)

먼저 DBMS로 MySQL을 줄 경우입니다. 이 경우 주의해야하는 점은 몇몇 MySQL connector들이 default charset으로 latin7을 사용하는 만행을 저지르고 있으므로 charset을 꼭 지정해주어야 합니다.

Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 23 2015, 02:52:03)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testenv.orm import session
>>> from testenv.sql import aes_encrypt, aes_decrypt
>>> key = b'1234567890123456' * 2
>>> iv = b'abcdefghijklmnop'
>>> session.query(aes_decrypt(aes_encrypt('item4의 AES 테스트 현장', key, iv), key, iv)).one()
2015-07-09 20:33:08,223 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'sql_mode'
2015-07-09 20:33:08,223 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,225 INFO sqlalchemy.engine.base.Engine SELECT DATABASE()
2015-07-09 20:33:08,225 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,228 INFO sqlalchemy.engine.base.Engine show collation where `Charset` = 'utf8' and `Collation` = 'utf8_bin'
2015-07-09 20:33:08,229 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,233 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS CHAR(60)) AS anon_1
2015-07-09 20:33:08,233 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,235 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS CHAR(60)) AS anon_1
2015-07-09 20:33:08,235 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,236 INFO sqlalchemy.engine.base.Engine SELECT CAST('test collated returns' AS CHAR CHARACTER SET utf8) COLLATE utf8_bin AS anon_1
2015-07-09 20:33:08,236 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,238 INFO sqlalchemy.engine.base.Engine SELECT cast(aes_decrypt(aes_encrypt(%s, %s, %s), %s, %s) AS CHAR) AS aes_decrypt_1
2015-07-09 20:33:08,238 INFO sqlalchemy.engine.base.Engine ('item4의 AES 테스트 현장', b'12345678901234561234567890123456', b'abcdefghijklmnop', b'12345678901234561234567890123456', b'abcdefghijklmnop')
('item4의 AES 테스트 현장',)

암호화 후 바로 복호화했는데 원문이 그대로 나왔습니다. 실제 출력된 SQL을 보면 잘 동작함을 알 수 있습니다.

이번엔 PostgreSQL에서 실행해보았습니다.

Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 23 2015, 02:52:03)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testenv.orm import session
>>> from testenv.sql import aes_encrypt, aes_decrypt
>>> key = b'1234567890123456' * 2
>>> iv = b'abcdefghijklmnop'
>>> session.query(aes_decrypt(aes_encrypt('item4의 AES 테스트 현장 PostgreSQL', key, iv), key, iv)).
2015-07-09 20:37:38,445 INFO sqlalchemy.engine.base.Engine select version()
2015-07-09 20:37:38,445 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,448 INFO sqlalchemy.engine.base.Engine select current_schema()
2015-07-09 20:37:38,449 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,452 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2015-07-09 20:37:38,452 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,456 INFO sqlalchemy.engine.base.Engine SELECT convert_from(decrypt_iv(encrypt_iv(%(aes_encrypt_1)s, %(aes_encrypt_2)s, %(aes_encrypt_3)s, %(mode)s), %(aes_decrypt_2)s, %(aes_decrypt_3)s, %(mode)s), 'UTF8') AS aes_decrypt_1
2015-07-09 20:37:38,456 INFO sqlalchemy.engine.base.Engine {'aes_encrypt_1': 'item4의 AES 테스트 현장 PostgreSQL', 'aes_decrypt_2': <psycopg2.extensions.Binary object at 0x103dcd620>, 'aes_encrypt_3': <psycopg2.extensions.Binary object at 0x103dcda08>, 'mode': 'aes-cbc/pad:pkcs', 'aes_encrypt_2': <psycopg2.extensions.Binary object at 0x103df91e8>, 'aes_decrypt_3': <psycopg2.extensions.Binary object at 0x103df9288>}
('item4의 AES 테스트 현장 PostgreSQL',)

역시 잘 됨을 확인할 수 있습니다.

이것을 활용하면 맨 처음 인용한 hybrid_property등으로 사용할 수 있을 것입니다. 이러한 응용법은 본 글에서 다루려고 한 범위를 벗어나므로 다음번에 다루려고 합니다.

사실 이 글을 쓸 때 Oracle에서도 동작하게 해볼까 했으나 문제가 있어서 실행에 옮기진 못했습니다. Python용 Oracle connector는 cx_Oracle이란 것인데, 이것의 설치를 위해서는 시스템에 Oracle이 필요합니다. 우회법이 없진 않을 것 같지만 별로 그렇게까지 하고 싶진 않았습니다. Oracle도 AES를 지원하므로 필요한 사람은 이 글을 응용해서 해봐도 좋지 않을까 싶습니다.4

각주.

  1. Flask에 아주 약한 의존성이 있다던가 하는 부분 원래 위치로

  2. 이것은 제가 암호화할 정보들의 거의 대부분이 String이기 때문이기도 합니다. 필요에 따라 변경이 가능합니다. 원래 위치로

  3. 왜냐하면 PgCrypto는 'bf', 즉 Blowfish 알고리즘도 지원하기 때문입니다. 원래 위치로

  4. 단, 오라클은 PKCS#5를 사용합니다. 원래 위치로