Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support incremental map/reduce #1118

Open
coffeemug opened this issue Jul 3, 2013 · 82 comments
Open

Support incremental map/reduce #1118

coffeemug opened this issue Jul 3, 2013 · 82 comments

Comments

@coffeemug
Copy link
Contributor

We had this on our radar for a while, but didn't have an issue to track it. Since some people have been asking for an official issue to track, I'm adding this to GitHub.

I'm going to write up a specific proposal a bit later. This is in backlog as it's obviously a medium-term priority feature.

@jdoliner
Copy link
Contributor

jdoliner commented Jul 3, 2013

How bad would it be if incremental map/reduce jobs could only be registered on a single table? If we limited ourselves to that this would actually become a much simpler problem to solve in the backend.

@coffeemug
Copy link
Contributor Author

Hmm, I have to think about it. It might be sufficient for most real use cases, but at first glance this makes me feel really uneasy. One thing MongoDB does that people find extremely annoying is introduce features that don't work with other features of the database. For example, they have plenty of collection types that cannot be sharded, which makes the user experience really frustrating since it moves the burden from developers to users. People can't just use the features they want and have the confidence that they will work.

(I don't think it necessarily means we should restrict functionality, just that this tradeoff comes with connotations frustrating for users, so we should think carefully before we choose to do it this way)

@apendleton
Copy link

Like, a single incremental job could only operate on data from a single table? Or that each database could only have one table on which incremental jobs could be registered?

For the use case I had in mind when I asked the HN question that I think prompted this ticket, the former would be acceptable, but the latter wouldn't. I have no idea what other uses people have in mind, though.

@jdoliner
Copy link
Contributor

jdoliner commented Jul 3, 2013

@apendleton the former was what I meant. To give people an idea of how much easier it is I think I could probably do the one table case in less than a month while the general case would probably as many as 4-5 months all told. I think it's a feature about on the same scale as secondary indexes which took about that long.

I actually think we should ship the one table case sometime semi soon (I think post 2.0 probably), gauge people's response to it and then expand from there. Also if we had triggers then the one table limitation really wouldn't be that bad because you could write triggers to push data from where you wanted it in to your single table where it would get map reduced. We'd add some sugar on top of that it and could actually be really nice. On top of that a lot of the features for managing tables you actually want for this incremental map reduce stuff as well. Redundancy will make the computed value more available. Sharding can help it scale better.

@coffeemug
Copy link
Contributor Author

@jdoliner -- when you get the chance could you explain the design for each of the options? (i.e. single-table option and multi-table option). I'd like to understand how you envision each version would work and where the factor of four-five difference in complexity comes from. (Obviously not urgent since we aren't doing this now)

@apendleton
Copy link

@jdoliner yeah, that all sounds awesome. We have a currently-Postgres database that I think I want to eventually replace with something-not-Postgres TBD, and we build aggregates on a whole bunch of tables that are very expensive to compute, and currently recompute everything from scratch on updates (additions, deletions, and changes of records). There's occasionally inter-table stuff, but 90% or more is probably single-table, and if we could change records and get new aggregates without recomputing everything from scratch, that would be a huge boon. I think you're absolutely right, too, that that use case is probably much more common than a complicated multi-table MR situation, and that in the interest of 80%-20% solutions, getting the single-table case out the door early would be totally worthwhile.

@jdoliner
Copy link
Contributor

jdoliner commented Jul 5, 2013

@coffeemug actually having thought about this a bit more I think the multi-table version of this is less a question of being complicated from an engineering perspective and more a question of being algorithmically untenable. You can imagine even a fairly simple multi-table mapreduce such as: table.eq_join("foo", table2).map(...).reduce(...) is very complicated to keep track of in an incremental way, and in a lot of cases downright impossible. Even a single row change in table2 can conceivably change the value of every single piece of data going in to the map reduce so there's really just no efficient way to compute an incremental view without basically rerunning the map reduce for every change to table2. We could maybe make some optimizations that made it more efficient if you had an approximately one-to-one join (which is probably the most common case) but that's going to be a big undertaking that only works in very specific cases which will be hard to explain to people and will behave very badly when it's used outside of those cases. Furthermore if people start using arbitrary sub expressions like: table.map(lambda x: query_on_table2(x)).reduce(lambda x,y: query_on_table3(x,y)) then all bets are off.

I definitely agree that it's annoying to have 2 features which aren't compatible but I think the reality is this is a situation where you can't sugarcoat the algorithmic limitations. Doing so is just going to lead to people bumping in to the limitations as exponential runtimes which is clearly a lot worse.

My conclusion here is that the easier thing of having map reduce jobs rooted on a single table is actually the right thing to do because it's something I know we can make fast and make in to a very useful feature. Also it's really a very doable thing because almost all the annoying parts of it are already written and "working" for secondary indexes and the system was designed to be easily extended to support incremental map reduce. I'll write up a full proposal for this at some point in the near future.

@coffeemug
Copy link
Contributor Author

@jdoliner -- this makes a lot of sense. I changed my mind -- I think it's ok to make this feature work on a single table and it's probably ok to never make it work on multiple tables. Actually, we already have precedent where we do best effort on non-deterministic queries, and generally handle them differently from deterministic ones. This would be no different.

@coffeemug
Copy link
Contributor Author

Moving to 1.14. We should debate the ReQL aspects of it, in case we decide to do it.

Very roughly:

r.table('users').avg('age').changes()
r.table('users').group('city').avg('age').changes()
r.table('users').group('city').reduce(reduction_fn, anireduction_fn).changes()

#2542 has some discussion of what this should return. I think:

  • We shouldn't persist things on disk for v1. If the query dies, they rerun and recompute the first value.
  • We should come prepackaged with the inverse functions for common aggregators.

@srh
Copy link
Contributor

srh commented Jun 19, 2014

That doesn't seem like incremental map reduce to me. I would expect it to involve some kind of persisted thing that you can query on any connection, not something that requires a live changefeed to be open.

@apendleton
Copy link

@srh yes, that was what I meant when I asked about it on HN last year; it's what Couch has and refers to by that name. You basically register a map/reduce job and its results are kept up to date automatically as the records it ran over are changed/deleted/added to.

@coffeemug
Copy link
Contributor Author

For the moment, I'm shooting for something very different with this feature. The spec above would give people the ability to get instantaneous updates to values of many different types of queries. They wouldn't persist on restart (or even on disconnect), but for a variety of reasons, I think that's sufficient for v1. It would require a bunch of infrastructure work, and would leave the door open to later include persistent incremental map/reduce support (where the user would save the query), but I think we should do that separately in future releases. I've opened #2587 to track that.

@mlucy
Copy link
Member

mlucy commented Jun 23, 2014

It's worth noting that for large tables, doing this without persistence will make it very hard to track changes on large tables unless you're 100% sure the client will never get disconnected.

@coffeemug
Copy link
Contributor Author

I think that's ok. We wouldn't market this feature as incremental map/reduce -- we'd market it as instantaneous updates to the result of a query (well, not quite like this, we'd have to find better wording, but you get the idea). Essentially, you pay the price of running a query, and then get any updates in realtime. We'll phrase it in such a way as to not confuse people, and not have them expect things that aren't quite true yet.

We can then deal with large tables in #2587.

@coffeemug
Copy link
Contributor Author

Related to #2542.

@coffeemug
Copy link
Contributor Author

Talked to @mlucy in person:

  • He convinced me persistence is more important than I thought
  • He's going to write up a proposal

@danielmewes
Copy link
Member

What I find interesting about CouchDB's implementation is that they don't require an inverse reduction function.

Instead they seem to store the intermediate reduction results. For example if your reduction function is (x, y) -> x+y and you have documents [1, 2, 3, 4], they would store the following results:

a: 1 + 2 -> 3
b: 3 + 4 -> 7
c: a + b -> 10

(i.e. build a binary reduction tree and store the intermediate results at each node)

Now if we let's say update the first value from 1 to 10, they only have to recompute log_2(n) results:

a': 10 + 2 -> 12
c': a' + b' -> 19

This makes it more convenient for the user, since they don't have to come up with an inverse function (which might also be wrong, which we can't detect).

It's definitely more difficult to implement.
I believe right now our reduction tree is heavily unbalanced also? That doesn't matter so much right now (unless the reduction function is extremely expensive), but would have to be changed to work incrementally without an inverse reduction.

@mlucy
Copy link
Member

mlucy commented Jun 25, 2014

So, there are advantages to both designs. Here are my thoughts on maintaining a tree:

Pros:

  • Harder for the user to mess up.
  • Works with a wider variety of functions (e.g. min and max).

Cons:

  • It takes up space. Like, O(n) in the size of the table. This can add up quickly.
  • It's more work to implement.
  • It's usually slower.

I would lean toward the inverse solution because it's easier, it scales better, and I would guess most people will be using our aggregators (sum, avg, etc.) rather than their own, and we can provide the inverse functions for them.

For min and max, we can say that we only offer live min and max on an index. (We first need to implement min and max on an index, but we should do that anyway.)

When we eventually make sample an aggregator, we can solve the inverse problem for sample by just making it fast in all cases (if we implement constant-time count by storing counts in our btree nodes, this won't be all that hard).

@danielmewes
Copy link
Member

so at 2.4 , we can do counts on changefeed? that will be awesome!

Yes, that's the idea :)
The initial count will run a bit slower in a changefeed for 2.4 compared to running just the count without changes. But we can optimize this in the future to get almost the same performance.

@meenie
Copy link

meenie commented Jun 10, 2016

If I could do a changefeed on an aggregate that calculates NPS, that would be amazing. I have data that looks something like this:

const npsData = [
  {
    "component_id": 1,
    "number": 10
  },
  {
    "component_id": 1,
    "number": 10
  },
  {
    "component_id": 2,
    "number": 8
  },
  {
    "component_id": 1,
    "number": 9
  },
  {
    "component_id": 2,
    "number": 2
  },
  ...
];

And my query looks something like this:

r.expr(npsData)
  .group('component_id', 'number').count()
  .ungroup()
  .map((row) => {
    const number = row('group').nth(1);
    const ret = r.expr({
      component_id: row('group').nth(0),
      distribution: [{number: number, total: row('reduction')}],
      total_answers: row('reduction'),
      detractors: 0,
      passives: 0,
      promoters: 0
    });

    return r.branch(
      number.eq(9).or(number.eq(10)),
      ret.merge({promoters: ret('promoters').add(row('reduction'))}),
      number.eq(7).or(number.eq(8)),
      ret.merge({passives: ret('passives').add(row('reduction'))}),
      ret.merge({detractors: ret('detractors').add(row('reduction'))})
    );
  })
  .group('component_id')
  .reduce((left, right) => ({
    component_id: left('component_id'),
    total_answers: left('total_answers').add(right('total_answers')),
    detractors: left('detractors').add(right('detractors')),
    passives: left('passives').add(right('passives')),
    promoters: left('promoters').add(right('promoters')),
    distribution: left('distribution').add(right('distribution')),
  }))
  .do((datum) => {
    const passivesPercentage = datum('passives').div(datum('total_answers')).mul(100);
    const promotersPercentage = datum('promoters').div(datum('total_answers')).mul(100);
    const detractorsPercentage = datum('detractors').div(datum('total_answers')).mul(100);
    return {
      distribution: datum('distribution'),
      passives_percentage: passivesPercentage,
      promoters_percentage: promotersPercentage,
      detractors_percentage: detractorsPercentage,
      score: promotersPercentage.sub(detractorsPercentage)
    };
  })
  .ungroup()
  .map(row => ({
    component_id: row('group'),
    distribution: row('reduction')('distribution'),
    passives_percentage: row('reduction')('passives_percentage'),
    promoters_percentage: row('reduction')('promoters_percentage'),
    detractors_percentage: row('reduction')('detractors_percentage'),
    score: row('reduction')('score')
  }));

Could the above be achieved with this new api?

@v3ss0n
Copy link

v3ss0n commented Jun 10, 2016

@danielmewes Excellent !

@danielmewes
Copy link
Member

@meenie It depends on whether or not you can express it as a reduce operation and whether there's an efficient "reverse" function that updates the query result when a document gets removed from the input set.

@meenie
Copy link

meenie commented Jun 10, 2016

@danielmewes So I wouldn't be able to use group()? And I'd have to do those counts manually using reduce? And ya, I believe you could reverse the above because you keep track of the distribution.

@danielmewes
Copy link
Member

@meenie You might be able to rewrite the grouping into a reduction, in which case it would work. You would basically maintain an object {group1: group1Value, group2: group2Value, ...} in the reduction. This might become inefficient if there are a lot of groups, because a new object will be constructed every time the reduction function is called.

@meenie
Copy link

meenie commented Jun 10, 2016

@danielmewes: Ya, that makes total sense. For now, we need every bit of efficiency we can get, so I'll be experimenting with rewriting out queries to use changefeed's, but won't utilise this in production until it's on parity with speed.

@deontologician
Copy link
Contributor

deontologician commented Jun 10, 2016

r.table('test').filter(...).reduce{|a,b| ...}.changes(reverse: lambda {|acc, o| ...})
r.table('test').filter(...).reduce(reverse: lambda {|acc, o| ...}){|a,b| ...}

Since the reverse function is only needed because it's a changefeed, it seems like the first one makes sense. But then again it's not clear which function it's reversing if they're separated. I guess it feels kind of wrong to me to require something extra when you're doing .changes vs. a normal query.

(to clarify, I know why we have to do it in this case, but it pulls me towards option 1 over option 2)

@danielmewes
Copy link
Member

danielmewes commented Jun 10, 2016

I guess it feels kind of wrong to me to require something extra when you're doing .changes vs. a normal query.

The way I think of it is that this is like needing to have the {index: ...} optarg for orderBy if you want to have a changefeed on it. I can see that this is slightly different because the reverse option will not have any effect unless you open a changefeed, but I don't feel like that's a big issue.

I don't like the first syntax because it seems limiting and different from what we do anywhere else.

What if in the future we allow changefeeds on queries that contain multiple reduce operations (for example within a subquery)? Specifying the function in changes would not work for that.

Or what if you have a query that looks like this: tbl.reduce(...).do(...).changes()? Ignoring the fact that we currently don't support do in changefeeds (which we totally should), it becomes much less obvious what the reverse argument to changes actually applies to and how it works. Does it get applied to the value after or before the do?

@deontologician
Copy link
Contributor

Yeah, it seems like the best way is to provide the reverse function to the reduce term. I'm assuming if you don't tack on .changes the reverse optarg will just be a no-op (vs. erroring)?

@danielmewes
Copy link
Member

I'm assuming if you don't tack on .changes the reverse optarg will just be a no-op (vs. erroring)?

Yeah that's what I thought. That way you can run the same query with and without .changes.

@mlucy
Copy link
Member

mlucy commented Jun 13, 2016

There's a lot of discussion above. Here's my understanding the
current proposal:

  • You can write any of these:
    • stream.avg(...).changes()
    • stream.sum(...).changes()
    • stream.count(...).changes()
    • stream.reduce(..., reverse: FUNC).changes()
  • In particular, it doesn't need to be on a selection;
    .map.reduce.changes etc. are legal. (We should probably support
    concat_map.reduce.changes even though we don't yet support
    .concat_map.changes, since it's easy.)

A few other things:

  • Should we support .coerce_to(...).changes()? The most common
    would probably be .coerce_to('array').changes(), where we'd
    re-send the whole array every time it changes. There's an argument
    that coercing to an array is a terminal, so it might be more
    consistent to support it.
  • Should we support .group.reduce.changes? There's no real
    technical limitation, it would be almost as easy as not supporting
    it. If so, should we also take this opportunity to support
    .group.changes?
  • How should we handle reductions over nothing?
    (E.g. r.table('test').avg('foo').changes() when test changes
    from empty to non-empty -- what's old_val?) Currently sum and
    count return 0 on empty streams, while avg and reduce produce
    an error.
    • We should probably use 0 as the "nothing" value for the
      terminals that return it on an empty stream.
    • One option would be to just use nil as the "nothing" value for
      all other terminals.
    • Another option would be to error by default, but to let people
      write e.g. .avg('foo').default(whatever).changes() to specify
      it explicitly.

Also, on the subject of implementation, it probably wouldn't actually be
that hard to do it the efficient way where we do chunks of reductions
on the shards and only ship the aggregates over. It would only speed
up the initial computation, but it would probably speed it up a lot.
(The reason I don't think it would be particularly hard is that we're
already only tracking timestamps on a per-read-transaction basis, so
we wouldn't lose any fidelity if we attached a terminal to the reads
we ship over and got back a pre-aggregated value alongside the
stamps.)

@v3ss0n
Copy link

v3ss0n commented Jun 13, 2016

.coerce_to(...).changes() looks like a convenience function , looks good but should be optional .
What i found exciting is group.changes and group.reduce.changes .

@danielmewes
Copy link
Member

@mlucy Thanks for the summary of the current proposal. That matches what I had in mind for 2.4.

I'd like to add coerce_to(...).changes() from your suggestions to this as it seems trivial to do.

The three extensions that you're suggesting

  1. .coerce_to(...).changes()
  2. .group(...).reduce(...).changes()
  3. more efficient implementation for the initial result

all sound really cool to me.

As far as I can tell, 1 would be easy to implement even as a pure fold-based rewrite at least to coerceTo('array'). We can just keep the current array in the accumulator. I'm not sure if there are any other types that we allow coercing to from a stream? 'string' maybe? Most likely those would also be easy to support, and we would still maintain the array but then just call a final .coerceTo(...) on the accumulator array before emitting a value. In any case doing this will be O(n) in the number of results, but that's expected since the output per change is already of that size.

My impression is that 2 (group.reduce.changes) is a bit more involved in terms of having to figure out how to represent added and removed groups in the output stream.

Since we have limited remaining development resources for 2.4 considering the other things we are working on, my suggestion would be that we agree on a minimal proposal, and keep extensions 2 and 3 out of the proposal for now. If we end up having extra time, we can still implement the more efficient algorithm (3) or discuss grouped changefeeds separately.

How should we handle reductions over nothing?

Great question.
My opinion is that we should emit them as the value null for avg and reduce (and 0 for sum and count). Our current changefeeds already use null to indicate the absence of a value. I think this would fit pretty nicely.
Reporting them as errors sounds nice on paper, but I think in practice it will be a much bigger pain for our users to handle.

@danielmewes
Copy link
Member

Also I would like to add that I'm extremely excited about this feature! It's going to be so amazing :-)

@v3ss0n
Copy link

v3ss0n commented Jun 14, 2016

Can we have 2 and 3 in 2.5? :D

@danielmewes
Copy link
Member

@v3ss0n I think so :)

@mlucy
Copy link
Member

mlucy commented Jun 14, 2016

@danielmewes -- leaving 2 and 3 for later sounds good to me. I don't think 2's representation would be a particularly involved discussion, though -- I was imagining we'd just emit the entire grouped data every time it changed (so {old_val: {grp: red, grp2: red2}, new_val: {grp: red}}). If we wanted to support plain old .group.changes that would require thinking a little about the format, though.

On coerce_to, I think coerce_to('array') and coerce_to('object') are the only ones that can take a stream.

@danielmewes
Copy link
Member

Marking settled as:

  • You can write any of these:
    • stream.avg(...).changes()
    • stream.sum(...).changes()
    • stream.count(...).changes()
    • stream.reduce(..., reverse: FUNC).changes()
    • stream.coerceTo('array').changes()
    • stream.coerceTo('object').changes()
  • In particular, it doesn't need to be on a selection;
    .map.reduce.changes etc. are legal. (We should probably support
    concat_map.reduce.changes even though we don't yet support
    .concat_map.changes, since it's easy.)

For 2.4 we will implement the slower variant that performs the initial reduction on the parsing node rather than distributing it.

@marshall007
Copy link
Contributor

marshall007 commented Jun 22, 2016

This is gonna be great! Note that supporting .coerce_to('array').changes() solves the use-case I had for #3719, so I would say we probably don't need anything from that proposal anymore. I also prefer these semantics over the optarg from the other proposal.

@nighelles
Copy link

This is in review 3714, except for coerce_to("array")

@nighelles nighelles self-assigned this Jul 29, 2016
@danielmewes danielmewes modified the milestones: 2.4, 2.4-polish Oct 14, 2016
@dvergeylen
Copy link

dvergeylen commented Oct 17, 2016

As this has been transferred to milestone 2.4-polish (for obvious reasons), I just wanted to emphase what @danielmewes has written at this comment as a work-around for the time being. I suggest the following to support the other common aggregation operations:

  • sum(): Very similar to count() but we increment by the value of el instead of 1.
    • <f_BASE> = 0
    • <f_APPLY> = function(acc, el) { return acc.add(el); }
    • <f_UNAPPLY> = function(acc, el) { return acc.sub(el); }
    • <f_EMIT> = function(acc) { return acc; }
  • min():
    • <f_BASE> = Number.POSITIVE_INFINITY
    • <f_APPLY> = function(acc, el) { return (el < acc ? el : acc); }
    • <f_UNAPPLY> = function(acc, el) { return acc; }
    • <f_EMIT> = function(acc) { return acc; }
  • max():
    • <f_BASE> = Number.NEGATIVE_INFINITY
    • <f_APPLY> = function(acc, el) { return (el > acc ? el : acc); }
    • <f_UNAPPLY> = function(acc, el) { return acc; }
    • <f_EMIT> = function(acc) { return acc; }
  • avg(): Similar to what Daniel did, but I would handle empty sets with a one liner like this:
    • <f_EMIT> = function(acc) { return (acc('c').neq(0) ? acc('sum').div(acc('c')) : 0); }

Any thoughts on this would be appreciated. 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests