EventImpl.java
package sprouts.impl;
import org.slf4j.Logger;
import sprouts.*;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicReference;
final class EventImpl implements Observable, Event {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(EventImpl.class);
private final Executor executor;
private final AtomicReference<Tuple<WeakReference<EventImpl>>> vessels = new AtomicReference<>((Tuple) Tuple.of(WeakReference.class));
private final AtomicReference<Tuple<Observer>> observers = new AtomicReference<>(Tuple.of(Observer.class));
EventImpl(Executor executor) {
this.executor = executor;
}
@SuppressWarnings("NullAway")
private Tuple<WeakReference<EventImpl>> _getVessels() {
return vessels.get();
}
private void _setVessels(Tuple<WeakReference<EventImpl>> vessels) {
this.vessels.set(vessels);
}
@SuppressWarnings("NullAway")
private Tuple<Observer> _getObservers() {
return observers.get();
}
private void _setObservers(Tuple<Observer> observers) {
this.observers.set(observers);
}
@Override
public void fire() {
executor.execute(() -> {
for (Observer observer : _getObservers()) {
try {
observer.invoke();
} catch (Exception e) {
log.error("Error invoking observer!", e);
}
}
for (WeakReference<EventImpl> vessel : _getVessels()) {
EventImpl event = vessel.get();
if (event != null)
event.fire();
}
_setVessels(_getVessels().removeIf(vessel -> vessel.get() == null));
});
}
@Override
public Observable observable() {
EventImpl vessel = new EventImpl(executor);
_setVessels(_getVessels().add(new WeakReference<>(vessel)));
return vessel;
}
@Override
public Observable subscribe(Observer observer) {
if (observer instanceof WeakObserver) {
WeakObserver<?> weakObserver = (WeakObserver<?>) observer;
weakObserver.owner().ifPresent(owner -> {
_setObservers(_getObservers().add(observer));
WeakReference<EventImpl> weakThis = new WeakReference<>(this);
AutomaticUnSubscriber cleaner = new AutomaticUnSubscriber(weakThis, weakObserver);
ChangeListenerCleaner.getInstance().register(owner, cleaner);
});
} else
_setObservers(_getObservers().add(observer));
return this;
}
@Override
public Observable unsubscribe(Subscriber subscriber) {
if (subscriber instanceof Observer)
_setObservers(_getObservers().remove((Observer) subscriber));
return this;
}
@Override
public void unsubscribeAll() {
_setObservers(_getObservers().clear());
}
private static final class AutomaticUnSubscriber implements Runnable {
private final WeakReference<EventImpl> weakThis;
private final WeakObserver<?> observer;
private AutomaticUnSubscriber(WeakReference<EventImpl> weakThis, WeakObserver<?> observer) {
this.weakThis = weakThis;
this.observer = observer;
}
@Override
public void run() {
EventImpl strongThis = weakThis.get();
if (strongThis == null)
return;
try {
observer.clear();
} catch (Exception e) {
log.error(
"An error occurred while clearing the weak observer '{}' during the process of " +
"removing it from the list of change actions.", observer, e
);
}
strongThis._setObservers(strongThis._getObservers().remove(observer));
}
}
}