В статье описывается подход к реализации на Java шаблона Активный объект, основанный на использовании аспектно-ориентированного расширения Java AspectJ и проекта Zephyr, добавляющего в Java легковесные потоки. Цель подхода — обойти недостатки существующих реализаций данного шаблона и сделать новую реализацию более прозрачной.
Активный объект — шаблон проектирования, который отделяет выполнение метода от его вызова. Шаблон позволяет повысить параллелизм и упростить синхронный доступ к объекту, который живет в собственном потоке.
Элементами шаблона являются Proxy (объект-заместитель), Method Request (запрос), Activation Queue (очередь), Scheduler (планировщик), Servant (обслуживающий объект) и Future.
Объект-заместитель предоставляет интерфейс, позволяющий клиентам вызывать публично доступные методы активного объекта использую стандартные, строго типизированные средства языка программирования вместо передачи слабо типизированных сообщений между потоками. Когда клиент вызывает метод, определяемый объектом-заместителем, создается запрос, который помещается в очередь. Все это происходит в клиентском потоке.
Запрос используется для передачи контекстной информации о вызове определенного метода, такой как параметры вызова, от объекта-заместителя планировщику, работающему в отдельном потоке. Для каждому метода активного объекта, предоставляемого объектом-заместителем, определяется конкретный подкласс абстрактного класса запроса. Экземпляры этих классов создаются объектом-заместителем, когда вызываются его методы, и содержат контекстную информацию, необходимую для того, чтобы выполнить эти методы и вернуть результаты обратно клиентам.
Очередь обеспечивает ограниченный буфер запросов, созданных объектом-заместителем и ожидающих выполнения. Очередь отделяет клиентский поток от потока обслуживающего объекта, поэтому два потока могут работать параллельно.
Планировщик работает в собственном потоке отличном от клиентских потоков и управляет очередью запросов, ожидающих выполнения. Планировщик решает, какой запрос извлечь из очереди следующим, чтобы выполнить его на обслуживающем объекте, реализующем соответствующий метод.
Обслуживающий объект определяет поведение и состояние активного объекта и реализует методы, определенные в объекте-заместителе и соответствующих запросах. Метод обслуживающего объекта вызывается, когда соответствующий запрос выполняется планировщиком, следовательно, обслуживающий объект выполняется в потоке планировщика.
Future позволяет клиенту получить результат вызова метода, после того как обслуживающий объект завершит выполнение этого метода. Когда клиент вызывает метод, Future возвращается клиенту сразу же. Для получения результата вызова клиент опрашивает или блокируется на Future до тех пор, пока результат не будет доступен.
В качестве примера реализации шаблона Активного объекта можно привести следующий код на Java.
public class NormalClass {
private double val = 0.0;
public void doSomething() {
val = 1.0;
}
public void doSomethingElse() {
val = 2.0;
}
}
public class MyTask {
private double val;
private final BlockingQueue<Runnable> dispatchQueue = new LinkedBlockingQueue<>();
public MyTask() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
dispatchQueue.take().run();
} catch (InterruptedException e) {
// okay, just terminate the dispatcher
}
}
}
}).start();
}
public void doSomething() throws InterruptedException {
dispatchQueue.put(new Runnable() {
@Override
public void run() {
val = 1.0;
}
});
}
public void doSomethingElse() throws InterruptedException {
dispatchQueue.put(new Runnable() {
@Override
public void run() {
val = 2.0;
}
});
}
}
public interface Squarer {
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i); //blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
}
public class SquarerImpl implements Squarer {
@Override
public void squareDontCare(int i) {
int sq = i * i; //Nobody cares :(
}
@Override
public Future<Integer> square(int i) {
return Futures.successful(i * i);
}
@Override
public Option<Integer> squareNowPlease(int i) {
return Option.some(i * i);
}
@Override
public int squareNow(int i) {
return i * i;
}
}
public class Main {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create();
Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps<>(Squarer.class, SquarerImpl.class));
mySquarer.squareDontCare(10);
Future<Integer> fSquare = mySquarer.square(10);
Option<Integer> oSquare = mySquarer.squareNowPlease(10);
int iSquare = mySquarer.squareNow(10);
assert Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue() == 100;
assert oSquare.get().intValue() == 100;
assert iSquare == 100;
TypedActor.get(system).stop(mySquarer);
system.shutdown();
}
}
Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. Вилки лежат на столе между каждой парой ближайших философов.
Каждый философ может либо есть, либо размышлять. Приём пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.
Каждый философ может взять ближайшую вилку (если она доступна), или положить — если он уже держит её. Взятие каждой вилки и возвращение её на стол являются раздельными действиями, которые должны выполняться одно за другим.
Суть проблемы заключается в том, чтобы разработать модель поведения (параллельный алгоритм), при котором ни один из философов не будет голодать, то есть будет вечно чередовать приём пищи и размышления.
@Active
public class Philosopher {
private final String name;
private final Fork leftFork;
private final Fork rightFork;
public Philosopher(String name, Fork leftFork, Fork rightFork) {
this.name = name;
this.leftFork = leftFork;
this.rightFork = rightFork;
}
@Oneway
public void start() {
while (true) {
Fork.Handle left = leftFork.take();
if (left == null) {
continue;
}
Fork.Handle right = rightFork.take();
if (right == null) {
left.put();
continue;
}
System.out.println(name + " starts to eat");
sleep(5000);
left.put();
right.put();
System.out.println(name + " starts to think");
sleep(5000);
}
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
}
@Active
public class Fork {
private boolean taken;
public Handle take() {
if (taken) {
return null;
}
taken = true;
return new Handle();
}
@Include
private void put() {
taken = false;
}
@Active
public class Handle {
private boolean used;
private Handle() {
}
public void put() {
if (used) {
throw new IllegalStateException();
}
used = true;
Fork.this.put();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
Fork fork1 = new Fork();
Fork fork2 = new Fork();
Fork fork3 = new Fork();
Fork fork4 = new Fork();
Fork fork5 = new Fork();
Philosopher philosopher1 = new Philosopher("Descartes", fork1, fork2);
Philosopher philosopher2 = new Philosopher("Nietzsche", fork2, fork3);
Philosopher philosopher3 = new Philosopher("Kant", fork3, fork4);
Philosopher philosopher4 = new Philosopher("Hume", fork4, fork5);
Philosopher philosopher5 = new Philosopher("Plato", fork5, fork1);
philosopher1.start();
philosopher2.start();
philosopher3.start();
philosopher4.start();
philosopher5.start();
Thread.sleep(60000);
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Active {
}
public aspect ActiveObjectAspect {
public interface ActiveObject {
}
declare parents: @Active * implements ActiveObject;
private BlockingQueue<Runnable> ActiveObject.queue = new LinkedBlockingQueue<>();
}
final class ActiveObjectThread extends Thread {
private final BlockingQueue<? extends Runnable> queue;
ActiveObjectThread(BlockingQueue<? extends Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Runnable task;
try {
task = queue.take();
} catch (InterruptedException ignored) {
continue;
}
task.run();
}
}
}
public aspect ActiveObjectAspect {
...
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
new ActiveObjectThread(obj.queue).start();
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Oneway {
}
public aspect ActiveObjectAspect {
...
void around(final ActiveObject obj): execution(@Oneway void ActiveObject+.*(..)) && this(obj) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
proceed(obj);
} catch (Throwable e) {
e.printStackTrace();
}
}
};
boolean interrupted = false;
try {
while (true) {
try {
obj.queue.put(task);
break;
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
public aspect ActiveObjectAspect {
...
Object around(final ActiveObject obj): execution(!@Oneway * ActiveObject+.*(..)) && this(obj) {
RunnableFuture<?> task = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return proceed(obj);
}
});
boolean interrupted = false;
try {
while (true) {
try {
obj.queue.put(task);
break;
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
try {
interrupted = false;
try {
while (true) {
try {
return task.get();
} catch (InterruptedException ignored) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} catch (ExecutionException e) {
throw e.getCause(); // ошибка компиляции
}
}
}
public aspect ActiveObjectAspect {
...
Object around(final ActiveObject obj): execution(!@Oneway * ActiveObject+.*(..)) && this(obj) {
...
try {
...
} catch (ExecutionException e) {
throw ActiveObjectAspect.<RuntimeException>throwException(e.getCause());
}
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> E throwException(Throwable exception) throws E {
throw (E) exception;
}
}
public interface Disposable {
void dispose();
}
public final class Disposer {
private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
private final Map<Object, Disposable> disposables = new ConcurrentHashMap<>();
public Disposer() {
DisposerThread thread = new DisposerThread(referenceQueue, disposables);
thread.setName(Disposer.class.getSimpleName());
thread.setDaemon(true);
thread.start();
}
public void register(Object obj, Disposable disposable) {
Objects.requireNonNull(obj);
Objects.requireNonNull(disposable);
disposables.put(new PhantomReference<>(obj, referenceQueue), disposable);
}
private static final class DisposerThread extends Thread {
private final ReferenceQueue<?> referenceQueue;
private final Map<?, ? extends Disposable> disposables;
DisposerThread(ReferenceQueue<?> referenceQueue, Map<?, ? extends Disposable> disposables) {
this.referenceQueue = referenceQueue;
this.disposables = disposables;
}
@Override
public void run() {
while (true) {
Reference<?> reference;
try {
reference = referenceQueue.remove();
} catch (InterruptedException ignored) {
continue;
}
Disposable disposable = disposables.remove(reference);
try {
disposable.dispose();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
}
final class ActiveObjectThread extends Thread implements Disposable {
...
volatile boolean running;
@Override
public void run() {
while (running) {
...
}
}
@Override
public void dispose() {
running = false;
interrupt();
}
}
public aspect ActiveObjectAspect {
...
private static final Disposer disposer = new Disposer();
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
...
disposer.register(obj, thread);
thread.start();
}
...
}
public aspect ActiveObjectAspect {
...
after(ActiveObject obj) returning: initialization((ActiveObject+ && !ActiveObject).new(..)) && this(obj) {
...
thread.running = true;
thread.start();
}
...
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
...
<build>
<plugins>
...
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>remapping-maven-plugin</artifactId>
<configuration>
<outputDirectory>${project.build.directory}/remapping-classes</outputDirectory>
<testOutputDirectory>${project.build.directory}/remapping-test-classes</testOutputDirectory>
<mappingEntries>
<mappingEntry>
<oldName>java/lang/Thread</oldName>
<newName>org/jvnet/zephyr/jcl/java/lang/Thread</newName>
</mappingEntry>
<mappingEntry>
<oldName>java/util/concurrent/FutureTask</oldName>
<newName>org/jvnet/zephyr/jcl/java/util/concurrent/FutureTask</newName>
</mappingEntry>
<mappingEntry>
<oldName>java/util/concurrent/LinkedBlockingQueue</oldName>
<newName>org/jvnet/zephyr/jcl/java/util/concurrent/LinkedBlockingQueue</newName>
</mappingEntry>
</mappingEntries>
</configuration>
<executions>
<execution>
<goals>
<goal>remapping</goal>
<goal>testRemapping</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>javaflow-maven-plugin</artifactId>
<configuration>
<classesDirectory>${project.build.directory}/remapping-classes</classesDirectory>
<testClassesDirectory>${project.build.directory}/remapping-test-classes</testClassesDirectory>
</configuration>
<dependencies>
<dependency>
<groupId>org.jvnet.zephyr.thread</groupId>
<artifactId>thread-api</artifactId>
<version>${zephyr.version}</version>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.jcl</groupId>
<artifactId>jcl-jdk7</artifactId>
<version>${zephyr.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>javaflow</goal>
<goal>testJavaflow</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>javaflow</id>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>javaflow</classifier>
<classesDirectory>${project.build.directory}/javaflow-classes</classesDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
К сожалению, не доступен сервер mySQL