Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionInstance._reader() Task was destroyed but it is pending #5043

Closed
danielnelson opened this issue Nov 6, 2015 · 10 comments
Closed
Assignees
Milestone

Comments

@danielnelson
Copy link

The code below produces an warning from asyncio.

$ cat test.py
#!/usr/bin/env python
import asyncio
import rethinkdb as r

async def stuff():
    conn = await r.connect()
    await conn.close()

r.set_loop_type("asyncio")
loop = asyncio.get_event_loop()
task = loop.create_task(stuff())
loop.run_until_complete(task)
$ python test.py
Task was destroyed but it is pending!
task: <Task pending coro=<ConnectionInstance._reader() running at lib/python3.5/site-packages/rethinkdb/net_asyncio.py:218> wait_for=<Future finished result=None>>

If I run another cycle of the loop by adding a second call to run_until_complete, then there is no warning. Perhaps ConnectionInstance.close() should yield from its self._reader_task before returning?

@danielmewes
Copy link
Member

@Tryneus do you have any idea about this?

@danielmewes danielmewes added this to the 2.2-polish milestone Nov 6, 2015
@sontek
Copy link
Contributor

sontek commented Nov 12, 2015

I'm also seeing this with my test of using asyncio:

@asyncio.coroutine
def asyncio_query(future, survey_id):
    data = []
    tconn = yield from r.connect("mt1-rethinkd1c1")
    tconn._get_json_decoder = create_decoder
    print("survey", survey_id)
    q = r.table(TABLE).filter({'survey_id': survey_id})
    cursor = yield from q.pluck(
        "survey_id", "respondent_id", "row_id", "column_id", "value_id"
    ).map(r.row.values()).run(tconn)

    while(yield from cursor.fetch_next()):
        row = yield from cursor.next()
        data.append(row)

    cursor.close()
    tconn.close()
    print("survey: %s, data: %s" % (survey_id, len(data)))
    future.set_result(data)

def get_async_io():
    r.set_loop_type("asyncio")
    loop = asyncio.get_event_loop()

    futures = []
    for i in range(1, 16):
        future = asyncio.Future()
        asyncio.async(asyncio_query(future, i))
        futures.append(future)

    loop.run_until_complete(asyncio.gather(*futures))
    data = []

    for future in futures:
        data += future.result()
    print('got all data')
    loop.close()
    return data

@danielnelson
Copy link
Author

The workaround I've been using is to shutdown like this:

loop.stop()
loop.run_forever()
loop.close()

@danielmewes
Copy link
Member

@Tryneus could you take a look at this please?

@sontek
Copy link
Contributor

sontek commented Nov 13, 2015

There is something interesting going on with the asyncio stuff because it seems to think its querying ton more data than it actually is as well:

screenshot from 2015-11-12 17-02-40

The first small line that is on there is doing a single query and it detected that it was getting 60k reads per second. The big boost at 1.5 million/sec is using async IO, the end result from both queries is 1 million records but its odd to see such a difference. Especially since if it thinks its getting 1.5 million rows per second the whole thing should take 1 second not 12.

Here is the code:

import rethinkdb as r
import random
import rapidjson
import pytz
import time
import yappi
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
UTC = pytz.utc

YAPPI = False
DO_CREATE = False
DO_INSERTS = False
SERVER = "mt1-rethinkd1c1"
#SERVER = "localhost"
TABLE = "test5"

class RapidJsonDecoder(object):
    def __init__(self, reql_format_opts):
        pass

    def decode(self, s):
        return rapidjson.loads(s, precise_float=False)

def create_decoder(format_opts):
    return RapidJsonDecoder(format_opts)

conn = r.connect(SERVER)
conn._get_json_decoder = create_decoder


if DO_CREATE:
    r.table_create(TABLE).run(conn)

START_BIGINT = 100000000000000000
END_BIGINT = 999999999999999999


def utc_now():
    now = datetime.utcnow()
    tz_now = now.replace(tzinfo=UTC)
    return tz_now.timestamp()


def get_rint(start=1000000, end=9999999):
    """
    Generate a very large integer
    :return:
    """
    return random.randint(start, end)


def get_bigint():
    """
    Generate a random BIGINT
    :return:
    """
    return get_rint(start=START_BIGINT, end=END_BIGINT)


if DO_INSERTS:
    objects_to_insert = []

    for i in range(0, 1000000):
        objects_to_insert.append({
            'survey_id': get_rint(start=1, end=15),
            'respondent_id': get_bigint(),
            'row_id': get_bigint(),
            'column_id': get_bigint(),
            'value_id': get_bigint(),
            'rid1': get_bigint(),
            'rid2': get_bigint(),
            'rid3': get_rint(),
            'now': utc_now()
        })

        if i % 5000 == 0:
            print('writing %s' % i)
            r.table(TABLE).insert(objects_to_insert).run(conn, durability="soft")
            objects_to_insert = []

    r.table(TABLE).insert(objects_to_insert).run(conn, durability="soft")

start = time.time()

if YAPPI:
    yappi.set_clock_type('cpu')
    yappi.start(builtins=True)


def query(survey_id):
    data = []
    tconn = r.connect("mt1-rethinkd1c1")
    tconn._get_json_decoder = create_decoder
    q = r.table(TABLE).filter({'survey_id': survey_id})
    results = q.pluck(
        "survey_id", "respondent_id", "row_id", "column_id", "value_id"
    ).map(r.row.values()).run(tconn)

    for row in results:
        data.append(row)

    return data

def get_multi_proc(use_threads=True):
    futures = []

    if use_threads:
        klass = ThreadPoolExecutor
    else:
        klass = ProcessPoolExecutor

    with klass(max_workers=4) as executor:
        for i in range(1, 16):
            future = executor.submit(query, i)
            futures.append(future)

        data = []

    for future in futures:
        data += future.result()

    return data

def get_single():
    # select all
    data = []
#    q = r.table(TABLE)
    q = r.table(TABLE).between(1, 16, index='survey_id')
#    q = r.table(TABLE).filter(
#        (r.row['survey_id'] >= 1) & (r.row['survey_id'] <= 16)
#    )
    result = q.pluck(
        "survey_id", "respondent_id", "row_id", "column_id", "value_id"
    ).map(r.row.values()).run(conn)

    for row in result:
        data.append(row)

    return data

import asyncio

@asyncio.coroutine
def asyncio_query(future, survey_id):
    data = []
    tconn = yield from r.connect("mt1-rethinkd1c1")
    tconn._get_json_decoder = create_decoder
    print("survey", survey_id)
    q = r.table(TABLE).filter({'survey_id': survey_id})
    cursor = yield from q.pluck(
        "survey_id", "respondent_id", "row_id", "column_id", "value_id"
    ).map(r.row.values()).run(tconn)

    while(yield from cursor.fetch_next()):
        row = yield from cursor.next()
        data.append(row)

    cursor.close()
    tconn.close()
    print("survey: %s, data: %s" % (survey_id, len(data)))
    future.set_result(data)

def get_async_io():
    r.set_loop_type("asyncio")
    loop = asyncio.get_event_loop()

    futures = []
    for i in range(1, 16):
        future = asyncio.Future()
        asyncio.async(asyncio_query(future, i))
        futures.append(future)

    final_futures = asyncio.wait(futures)
    loop.run_until_complete(final_futures)

    data = []

    for future in futures:
        data += future.result()
    print('got all data')
    loop.stop()
    # hack to work around bug in rethinkdb driver
    loop.run_forever()
    loop.close()
    return data

data = get_single()
#data = get_multi_proc()
#data = get_multi_proc(use_threads=False)
data = get_async_io()
count = len(data)
end = time.time()

if YAPPI:
    stats = yappi.get_func_stats()
    stats.save('callgrind.out', type='callgrind')
    print('checkout callgrind.out')

print("count is %s" % count)
duration = int(1000 * (end - start))
print("int query took %sms" % duration)

@danielmewes
Copy link
Member

@sontek Since the asyncio_query is using a filter, RethinkDB actually has to read all documents for checking whether the filter criteria matches for each of the queries.
If you switch that to .getAll(survey_id, {index: "survey_id"}) I suspect that the reads/s graph will match up.

@danielmewes danielmewes modified the milestones: 2.2-polish, 2.2.x Nov 13, 2015
@sontek
Copy link
Contributor

sontek commented Nov 13, 2015

@danielmewes yeah, that made it look more reasonable

@sontek
Copy link
Contributor

sontek commented Nov 13, 2015

@Tryneus Looking at the code, there doesn't seem to be a way to signal to _reader that it needs to stop. It starts waiting for the next signal but never gets one. Here is a PR that could fix it:

https://github.com/rethinkdb/rethinkdb/pull/5082/files

@danielmewes
Copy link
Member

#5082 was merged. Going to ship in 2.2.1.

@danielmewes danielmewes modified the milestones: 2.2.1, 2.2.x Nov 17, 2015
@danielmewes
Copy link
Member

We released an updated version of the Python driver (2.2.0.post1) today. It's available through pip and includes the fix by @sontek / @danielnelson .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants