WL#7802: PERFORMANCE SCHEMA, BATCH TABLE IO

Status: Complete

Implement changes in the table io instrumentation,
to record statistics in batch of N operations,
instead of N times 1 operation.

This is for selected use cases only (per join scan),
needs explicit calls from the optimizer to begin and end a batch.

Should decrease significantly the overhead for table io.

Performance Schema
==================

NF-1 Table io instrumentation

This is a Non Functional requirement.
The instrumentation API for table io is extended to also pass
the number of rows affected by the table io operation.

F-2 Table performance_schema.events_waits_current

For table io events, the column NUMBER_OF_BYTES indicate the number of rows
affected by the table io operation.
Previously, this column was NULL.

F-3 Table performance_schema.events_waits_history

Same as F-2

F-4 Table performance_schema.events_waits_history_long

Same as F-2

Server Performance requirements
===============================

P-5 Performance schema overhead

When executing a payload with the table io instrumentation turned on:

- P-5-1 for workload instrumented by batch, the performance schema overhead should
decrease compared to the previous implementation

- P-5-2 for workload not instrumented by batch, the performance schema overhead
should
not increase compared to the previous implementation

The proposal involves per-scan reporting of row fetch operations for some parts
of a query.

This is a simple solution that is less accurate than the current implementation,
but the inaccuracy should still be bearable.

The MySQL executor performs joins using a nested-loop implementation. The job of
the PFS instrumentation is to provide rowcount and accumulated execution time
per table in a join operation. Today, this instrumentation is achieved because
all row fetches in all stages of the nested-loop function are reported
individually to the performance schema and aggregated per table afterwards.

Let us assume a join query on the following form:

SELECT ... FROM t1 JOIN t2 ON ... JOIN t3 ON ...

and a table order t1 - t2 - t3.

If there is a fanout greater than 1 for table t3, the majority of row fetch
operations will be for this table. Hence, we can significantly reduce the number
of instrumented operations if we aggregate these operations to the performance
schema per scan (ie per unique combination of rows from t1 and t2). Let us say
that we access 10 rows from table t1, 10 rows from table t2 per row from t1 and
10 rows from table t3 per row of table t2. The total number of instrumented row
operations is 10 * 10 * 10 + 10 * 10 + 10 = 1110.

If we instrument each loop of the inner-most table t3 instead of each row, the
number of instrumented row operations is reduced to 10 * 10 + 10 * 10 + 10 =
210, ie a reduction of 80%.

Algorithm:

Do regular row instrumentation for all table operations, except those where

1. We are accessing the inner-most table of a query block, and
2. We are not requesting a single row from that table (eq_ref), and
3. We are not evaluating a subquery containing table access for that table.

For such tables, we switch from instrumenting row operations to instrumenting
full scans by:

1. Before starting the access, signal to the storage handler that scan
   instrumentation will be used.

2. On first access to the handler, start the timer and reset number of rows
   to zero.

3. On each successful row fetch in the handler, increment the row count, but
   do not report to performance schema.

4. On first unsuccessful row fetch (error, eof), report row count and
   execution time to performance schema.

5. If scan is aborted by nested-loop executor (e.g because of LIMIT reached
   or first-match complete), signal the handler that scan is complete.
   Handler will then report accumulated data to performance schema.

Problem:

Instrumenting data per scan has one important drawback over a solution that
instruments per row or per batch of rows: All execution time that is consumed
above the storage engine is also accumulated into the row fetch time. This means
that overhead due to shipping rows to client, or doing join buffering, or doing
aggregation, will also be accumulated into the reported execution time. Hence,
execution time data for this inner-most table will be less accurate than data
for all other tables. 

Scan operations NOT covered by this worklog:
--------------------------------------------

There are some other sequential operations that might be covered, but which are
not currently:

1. Internal temporary tables are not covered.
These tables are used in grouping and/or sorting, where a single sorted/grouped
table is produced and then read sequentially. This sequential reading can be
controlled just like single-table scans. Those tables are associated with
QEP_TAB objects with index >= primary_tables. In other words, we can extend
coverage to these tables by checking for QEP_TAB >= (qep_tab + primary_tables -
1) instead of the current equality.
In addition, we need to instrument QEP_tmp_table::end_send() like is done in
sub_select().
Now,  PFS does not support internal temporary tables, so this task can be
completed in the future.

2. Tables involved in join buffering are not covered.
Consider the operation t1 JOIN t2 implemented with join buffering. Currently, we
use batch PFS mode on table t2. However, table t1 is also read in large chunks,
so it may also be a candidate for batch PFS mode.
However, the condition for adding such tables is a bit complex.

3. Tables being materialized in semi-join
We have two patterns for materialization of subqueries through semi-join
a) materialize-lookup: t1 - mat - SJM(t2)
Which means:
- Join the materialized table "mat" with t1, using eq_ref operation on "mat".
- Before reading from "mat", materialize it from table t2.
Here, the materialized table is the last QEP_TAB and thus PFS batch mode may be
used for it. This is the only place PFS batch mode may be used, so no extra work
is needed.

b) materialize-scan: mat - t1 - SJM(t2)
Which means:
- Join the materialized table "mat" with t1.
- Before reading "mat", materialize it from table t2.
Here, the materialized table is the last QEP_TAB and thus PFS batch mode may be
used for it. We could also use PFS batch mode for table t1 in the join operation.
Again, the condition for adding such tables is a bit complex. 
CODE CHANGES
============
1. enum_nested_loop_state
sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)

Signal the handle to end PFS batch mode for signle table scan, it happens:
  A. Reading last row.
  B. First unsuccessful row fetch (error, eof). 
  C. Scan is aborted by nested-loop executor (e.g because of LIMIT reached
or first-match complete)

'sub_select()' function can handle a single table scan,  it uses loop to get the
each row.
When it begins to get the first row, we open pfs_batch_update mode to single PFS.
When the loop end, we close pfs_batch_update mode to single PFS.
No matter reading last row(eof) or error or scan is aborted, the loop must end
so we can close pfs_batch_update mode to single PFS after the loop ends. 

2. 
enum_nested_loop_state JOIN_CACHE::join_records(bool skip_last)

'join_records()' function can handle join operation. It call join_matching_records()
to finish join operation which uses BNL/BKA/BKA-UNIQUE algorithm.
So we can open pfs_batch_update mode before join_matching_records(),
and we can close pfs_batch_update mode after join_matching_records(). 

3 add a new function into QEP_TAB
bool QEP_TAB::pfs_batch_update(JOIN *join)

This function know if MySQL should use PFS batch mode on what conditions.
This function is called by sub_select()/join_records().
 
bool QEP_TAB::pfs_batch_update(JOIN *join)
{
  /*
    Use PFS batch mode unless
     1. tab is not an inner-most table, or
     2. a table has eq_ref or const access type, or
     3. this tab contains a subquery that accesses one or more tables
  */

  return !((join->qep_tab + join->primary_tables - 1) != this || // 1
           this->type() == JT_EQ_REF ||                          // 2
           this->type() == JT_CONST  ||
           this->type() == JT_SYSTEM ||
           (condition() && condition()->has_subquery()));        // 3
}