| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 
 | import logging
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')
 
import sys
PyVER = (sys.version_info.major, sys.version_info.minor)
PyV3 = PyVER >= (3, 1)
PyV2 = PyVER == (2, 7)
import sqlalchemy
assert sqlalchemy.__version__.startswith('0.8')
 
import inspect
def trace():
    frames = inspect.getouterframes(inspect.currentframe())
    for f in frames:
        log.debug(str(f))
 
 
if PyV2:
    import sqlalchemy.types as types
 
    class MyString(types.TypeDecorator):
        impl = types.String
        def process_bind_param(self, value, dialect):
            if value:
                log.debug('*** process_bind_param: got type=%s' % type(value))
                assert isinstance(value, str)
                value = value.decode('utf-8')
                trace()
            return value
 
        def process_result_value(self, value, dialect):
            if value:
                log.debug('*** process_result_value: got type=%s' % type(value))
                assert isinstance(value, unicode)
                value = value.encode('utf-8')
            return value
 
 
if PyV3:
    import sqlalchemy.dialects.sqlite as dialect
    class MyString(dialect.VARCHAR):
 
        def bind_processor(self, dialect):
            impl = super().bind_processor(dialect) or (lambda value: value)
            def process(value):
                log.debug('*** bind_processor: got type=%s' % type(value))
                value = impl(value)
                log.debug('*** bind_processor: returns type=%s' % type(value))
                return value
            return process
 
        def result_processor(self, dialect, coltype):
            impl = super().result_processor(dialect, coltype) or (lambda value: value)
            def process(value):
                log.debug('*** result_processor: got type=%s' % type(value))
                value = impl(value)
                log.debug('*** result_processor: return type=%s' % type(value))
                return value
            return process
 
 
if __name__ == '__main__':
    import os
    from sqlalchemy import MetaData, Table, create_engine, Column, select, String
 
    path = None
 
    meta = MetaData()
    if PyV2:
        test_table = Table('test_table', meta,
              Column('name', MyString(50)),
              Column('data', String(255)))
    elif PyV3:
        test_table = Table('test_table', meta,
              Column('name', MyString(50, convert_unicode='force')),
              Column('data', String(255)))
 
    if path is not None and os.path.exists(path):
        os.remove(path)
    else:
        path = ':memory:'
 
    engine = create_engine('sqlite:///%s' % path)
    meta.bind = engine
    test_table.create()
    test_table.insert().execute(name='foo', data='some data')
    rows = select(
             [ test_table.c.name, test_table.c.data ]
        ).execute(
        ).fetchall()
    for name, data in rows:
        log.debug ('name: %s, data=%s' % (name, data)) | 
Partager