New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ConnectionInstance._reader() Task was destroyed but it is pending #5043
Comments
@Tryneus do you have any idea about this? |
I'm also seeing this with my test of using asyncio: @asyncio.coroutine
def asyncio_query(future, survey_id):
data = []
tconn = yield from r.connect("mt1-rethinkd1c1")
tconn._get_json_decoder = create_decoder
print("survey", survey_id)
q = r.table(TABLE).filter({'survey_id': survey_id})
cursor = yield from q.pluck(
"survey_id", "respondent_id", "row_id", "column_id", "value_id"
).map(r.row.values()).run(tconn)
while(yield from cursor.fetch_next()):
row = yield from cursor.next()
data.append(row)
cursor.close()
tconn.close()
print("survey: %s, data: %s" % (survey_id, len(data)))
future.set_result(data)
def get_async_io():
r.set_loop_type("asyncio")
loop = asyncio.get_event_loop()
futures = []
for i in range(1, 16):
future = asyncio.Future()
asyncio.async(asyncio_query(future, i))
futures.append(future)
loop.run_until_complete(asyncio.gather(*futures))
data = []
for future in futures:
data += future.result()
print('got all data')
loop.close()
return data |
The workaround I've been using is to shutdown like this:
|
@Tryneus could you take a look at this please? |
There is something interesting going on with the asyncio stuff because it seems to think its querying ton more data than it actually is as well: The first small line that is on there is doing a single query and it detected that it was getting 60k reads per second. The big boost at 1.5 million/sec is using async IO, the end result from both queries is 1 million records but its odd to see such a difference. Especially since if it thinks its getting 1.5 million rows per second the whole thing should take 1 second not 12. Here is the code: import rethinkdb as r
import random
import rapidjson
import pytz
import time
import yappi
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
UTC = pytz.utc
YAPPI = False
DO_CREATE = False
DO_INSERTS = False
SERVER = "mt1-rethinkd1c1"
#SERVER = "localhost"
TABLE = "test5"
class RapidJsonDecoder(object):
def __init__(self, reql_format_opts):
pass
def decode(self, s):
return rapidjson.loads(s, precise_float=False)
def create_decoder(format_opts):
return RapidJsonDecoder(format_opts)
conn = r.connect(SERVER)
conn._get_json_decoder = create_decoder
if DO_CREATE:
r.table_create(TABLE).run(conn)
START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999
def utc_now():
now = datetime.utcnow()
tz_now = now.replace(tzinfo=UTC)
return tz_now.timestamp()
def get_rint(start=1000000, end=9999999):
"""
Generate a very large integer
:return:
"""
return random.randint(start, end)
def get_bigint():
"""
Generate a random BIGINT
:return:
"""
return get_rint(start=START_BIGINT, end=END_BIGINT)
if DO_INSERTS:
objects_to_insert = []
for i in range(0, 1000000):
objects_to_insert.append({
'survey_id': get_rint(start=1, end=15),
'respondent_id': get_bigint(),
'row_id': get_bigint(),
'column_id': get_bigint(),
'value_id': get_bigint(),
'rid1': get_bigint(),
'rid2': get_bigint(),
'rid3': get_rint(),
'now': utc_now()
})
if i % 5000 == 0:
print('writing %s' % i)
r.table(TABLE).insert(objects_to_insert).run(conn, durability="soft")
objects_to_insert = []
r.table(TABLE).insert(objects_to_insert).run(conn, durability="soft")
start = time.time()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
def query(survey_id):
data = []
tconn = r.connect("mt1-rethinkd1c1")
tconn._get_json_decoder = create_decoder
q = r.table(TABLE).filter({'survey_id': survey_id})
results = q.pluck(
"survey_id", "respondent_id", "row_id", "column_id", "value_id"
).map(r.row.values()).run(tconn)
for row in results:
data.append(row)
return data
def get_multi_proc(use_threads=True):
futures = []
if use_threads:
klass = ThreadPoolExecutor
else:
klass = ProcessPoolExecutor
with klass(max_workers=4) as executor:
for i in range(1, 16):
future = executor.submit(query, i)
futures.append(future)
data = []
for future in futures:
data += future.result()
return data
def get_single():
# select all
data = []
# q = r.table(TABLE)
q = r.table(TABLE).between(1, 16, index='survey_id')
# q = r.table(TABLE).filter(
# (r.row['survey_id'] >= 1) & (r.row['survey_id'] <= 16)
# )
result = q.pluck(
"survey_id", "respondent_id", "row_id", "column_id", "value_id"
).map(r.row.values()).run(conn)
for row in result:
data.append(row)
return data
import asyncio
@asyncio.coroutine
def asyncio_query(future, survey_id):
data = []
tconn = yield from r.connect("mt1-rethinkd1c1")
tconn._get_json_decoder = create_decoder
print("survey", survey_id)
q = r.table(TABLE).filter({'survey_id': survey_id})
cursor = yield from q.pluck(
"survey_id", "respondent_id", "row_id", "column_id", "value_id"
).map(r.row.values()).run(tconn)
while(yield from cursor.fetch_next()):
row = yield from cursor.next()
data.append(row)
cursor.close()
tconn.close()
print("survey: %s, data: %s" % (survey_id, len(data)))
future.set_result(data)
def get_async_io():
r.set_loop_type("asyncio")
loop = asyncio.get_event_loop()
futures = []
for i in range(1, 16):
future = asyncio.Future()
asyncio.async(asyncio_query(future, i))
futures.append(future)
final_futures = asyncio.wait(futures)
loop.run_until_complete(final_futures)
data = []
for future in futures:
data += future.result()
print('got all data')
loop.stop()
# hack to work around bug in rethinkdb driver
loop.run_forever()
loop.close()
return data
data = get_single()
#data = get_multi_proc()
#data = get_multi_proc(use_threads=False)
data = get_async_io()
count = len(data)
end = time.time()
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration)
|
@sontek Since the |
@danielmewes yeah, that made it look more reasonable |
@Tryneus Looking at the code, there doesn't seem to be a way to signal to |
#5082 was merged. Going to ship in 2.2.1. |
We released an updated version of the Python driver ( |
The code below produces an warning from asyncio.
If I run another cycle of the loop by adding a second call to
run_until_complete
, then there is no warning. PerhapsConnectionInstance.close()
should yield from itsself._reader_task
before returning?The text was updated successfully, but these errors were encountered: