1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| from threading import Thread, Event
from queue import Queue, Empty, Full
from time import sleep
import sqlite3
import os
def writer(queue, chunk_size, db_path='./test.sqlite3'):
if os.path.exists(db_path):
os.remove(db_path)
cnx = sqlite3.connect(db_path)
cnx.execute('create table test(a, b, c)')
buffer = []
while True:
try:
item = queue.get(timeout=0.5)
except Empty:
if done.is_set():
break
else:
buffer.append(item)
if len(buffer) == chunk_size:
with cnx:
cnx.executemany(
'insert into test values(?, ?, ?)',
buffer)
buffer.clear()
def reader(queue, count=100, delay=0.02):
for x in range(count):
queue.put_nowait((x, f'yyy-{x}', f'yyy-{x}') )
sleep(delay)
if __name__ == '__main__':
CHUNK_SIZE = 10
COUNT = 1000
DELAY = 0.002
done = Event()
queue = Queue(CHUNK_SIZE)
th = Thread(target=writer, args=(queue, CHUNK_SIZE))
th.start()
try:
reader(queue, count=COUNT, delay=DELAY)
except Full:
print('failed')
except Exception as e:
print('unexpected exception', e)
else:
print('Done.')
finally:
done.set()
th.join() |
Partager