Skip to content

Commit

Permalink
Introduce ChannelHandlerInvoker, dedeciated for invoking event handle…
Browse files Browse the repository at this point in the history
…r methods, and move most handler invocation code in ChannelHandlerContext to the default ChannelHandlerInvoker implementation

- Fixes #1912
- Add ChannelHandlerInvoker and its default implementation
- Add pipeline manipulation methods that accept ChannelHandlerInvoker
- Rename Channel(Inbound|Outbound)Invoker to
  Channel(Inbound|Outbound)Ops to avoid confusion
- Remove the Javadoc references to the package-private interfaces
  • Loading branch information
trustin committed Nov 21, 2013
1 parent 883ab29 commit 132af3a
Show file tree
Hide file tree
Showing 72 changed files with 1,598 additions and 928 deletions.
Expand Up @@ -51,7 +51,7 @@ public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception {
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1);
}

private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
private static void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
List<Integer> headerSizes = Arrays.asList(90, 900);
for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);
Expand Down
Expand Up @@ -16,9 +16,8 @@
package io.netty.util.concurrent;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
Expand All @@ -29,24 +28,43 @@
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {

static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

private final EventExecutorGroup parent;

protected AbstractEventExecutor() {
this(null);
}

protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}

@Override
public EventExecutorGroup parent() {
return parent;
}

@Override
public EventExecutor next() {
return this;
}

@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
@SuppressWarnings("unchecked")
public <E extends EventExecutor> Set<E> children() {
return Collections.singleton((E) this);
}

@Override
public Iterator<EventExecutor> iterator() {
return new EventExecutorIterator();
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}

@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

/**
Expand Down Expand Up @@ -131,27 +149,4 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}

private final class EventExecutorIterator implements Iterator<EventExecutor> {
private boolean nextCalled;

@Override
public boolean hasNext() {
return !nextCalled;
}

@Override
public EventExecutor next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
nextCalled = true;
return AbstractEventExecutor.this;
}

@Override
public void remove() {
throw new UnsupportedOperationException("read-only");
}
}
}
Expand Up @@ -23,12 +23,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.netty.util.concurrent.AbstractEventExecutor.*;


/**
* Abstract base class for {@link EventExecutorGroup} implementations.
*/
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
Expand Down Expand Up @@ -66,7 +67,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD

@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

/**
Expand Down
Expand Up @@ -16,15 +16,36 @@
package io.netty.util.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
* Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a
* serial fashion
*
*/
final class DefaultEventExecutor extends SingleThreadEventExecutor {
public final class DefaultEventExecutor extends SingleThreadEventExecutor {

DefaultEventExecutor(DefaultEventExecutorGroup parent, Executor executor) {
public DefaultEventExecutor() {
this((EventExecutorGroup) null);
}

public DefaultEventExecutor(ThreadFactory threadFactory) {
this(null, threadFactory);
}

public DefaultEventExecutor(Executor executor) {
this(null, executor);
}

public DefaultEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventExecutor.class));
}

public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}

public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) {
super(parent, executor, true);
}

Expand Down
31 changes: 31 additions & 0 deletions common/src/main/java/io/netty/util/concurrent/EventExecutor.java
Expand Up @@ -15,6 +15,10 @@
*/
package io.netty.util.concurrent;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
* The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop.
Expand All @@ -30,6 +34,12 @@ public interface EventExecutor extends EventExecutorGroup {
@Override
EventExecutor next();

/**
* Returns an unmodifiable singleton set which contains itself.
*/
@Override
<E extends EventExecutor> Set<E> children();

/**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/
Expand Down Expand Up @@ -69,4 +79,25 @@ public interface EventExecutor extends EventExecutorGroup {
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);

@Override
Future<?> submit(Runnable task);

@Override
<T> Future<T> submit(Runnable task, T result);

@Override
<T> Future<T> submit(Callable<T> task);

@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
Expand Up @@ -15,8 +15,8 @@
*/
package io.netty.util.concurrent;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,11 +27,11 @@
* to shut them down in a global fashion.
*
*/
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
public interface EventExecutorGroup extends ScheduledExecutorService {

/**
* Returns {@code true} if and only if this executor was started to be
* {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
* Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
* are being {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
*/
boolean isShuttingDown();

Expand Down Expand Up @@ -59,7 +59,8 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

/**
* Returns the {@link Future} which is notified when this executor has been terminated.
* Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
* {@link EventExecutorGroup} have been terminated.
*/
Future<?> terminationFuture();

Expand All @@ -78,16 +79,14 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
List<Runnable> shutdownNow();

/**
* Returns one of the {@link EventExecutor}s that belong to this group.
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();

/**
* Returns a read-only {@link Iterator} over all {@link EventExecutor}, which are handled by this
* {@link EventExecutorGroup} at the time of invoke this method.
* Returns the unmodifiable set of {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
@Override
Iterator<EventExecutor> iterator();
<E extends EventExecutor> Set<E> children();

@Override
Future<?> submit(Runnable task);
Expand Down
Expand Up @@ -64,11 +64,6 @@ private GlobalEventExecutor() {
delayedTaskQueue.add(purgeTask);
}

@Override
public EventExecutorGroup parent() {
return null;
}

/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
*
Expand Down
Expand Up @@ -21,18 +21,14 @@
* {@link AbstractEventExecutor} which execute tasks in the callers thread.
*/
public final class ImmediateEventExecutor extends AbstractEventExecutor {

public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();

private final Future<?> terminationFuture = new FailedFuture<Object>(
GlobalEventExecutor.INSTANCE, new UnsupportedOperationException());

private ImmediateEventExecutor() {
// use static instance
}

@Override
public EventExecutorGroup parent() {
return null;
// Singleton
}

@Override
Expand Down
Expand Up @@ -16,8 +16,7 @@
package io.netty.util.concurrent;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
Expand All @@ -31,6 +30,7 @@
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
Expand Down Expand Up @@ -62,7 +62,7 @@ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object.
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new SingleThreadEventExecutor[nThreads];
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
Expand Down Expand Up @@ -104,6 +104,10 @@ public void operationComplete(Future<Object> future) throws Exception {
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

protected ThreadFactory newDefaultThreadFactory() {
Expand All @@ -115,11 +119,6 @@ public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

@Override
public Iterator<EventExecutor> iterator() {
return children().iterator();
}

/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
Expand All @@ -128,13 +127,10 @@ public final int executorCount() {
return children.length;
}

/**
* Return a safe-copy of all of the children of this group.
*/
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
@Override
@SuppressWarnings("unchecked")
public final <E extends EventExecutor> Set<E> children() {
return (Set<E>) readonlyChildren;
}

/**
Expand Down

0 comments on commit 132af3a

Please sign in to comment.