ChangeListeners.java
package sprouts.impl;
import org.slf4j.Logger;
import sprouts.Observer;
import sprouts.*;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Predicate;
final class ChangeListeners<T>
{
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChangeListeners.class);
private final Map<Channel, ChannelListeners<T>> _actions = new LinkedHashMap<>();
public ChangeListeners() {}
public ChangeListeners(ChangeListeners<T> other) {
_copyFrom(other);
}
private void _copyFrom(ChangeListeners<T> other) {
other._actions.forEach( (k, v) -> _actions.put(k, new ChannelListeners<>(v)) );
}
private ChannelListeners<T> _getActionsFor(Channel channel ) {
return _actions.computeIfAbsent(channel, k->new ChannelListeners<>());
}
private void _removeActionIf( Predicate<Action<ValDelegate<T>>> predicate ) {
for ( ChannelListeners<T> actions : _actions.values() )
actions.removeIf(predicate);
}
public void onChange( Channel channel, Action<ValDelegate<T>> action ) {
Objects.requireNonNull(channel);
Objects.requireNonNull(action);
_getActionsFor(channel).add(action);
}
public final void onChange( Observer observer ) {
onChange(Sprouts.factory().defaultObservableChannel(), new SproutChangeListener<>(observer) );
}
public void fireChange( Val<T> owner, Channel channel ) {
if ( channel == From.ALL)
for ( Channel key : _actions.keySet() )
_getActionsFor(key).trigger( channel, owner );
else {
_getActionsFor(channel).trigger( channel, owner );
_getActionsFor(From.ALL).trigger( channel, owner );
}
}
public void unsubscribe( Subscriber subscriber ) {
_removeActionIf( a -> {
if ( a instanceof SproutChangeListener ) {
SproutChangeListener<?> pcl = (SproutChangeListener<?>) a;
return pcl.listener() == subscriber;
}
else
return Objects.equals(a, subscriber);
});
}
public long numberOfChangeListeners() {
return _actions.values()
.stream()
.mapToLong(ChannelListeners::numberOfChangeListeners)
.sum();
}
@Override
public final String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName()).append("[");
for ( Channel key : _actions.keySet() ) {
try {
sb.append(key).append("->").append(_actions.get(key)).append(", ");
} catch ( Exception e ) {
log.error("An error occurred while trying to get the number of change listeners for channel '{}'", key, e);
}
}
sb.append("]");
return sb.toString();
}
private static final class ChannelListeners<T> {
private final List<Action<ValDelegate<T>>> _channelActions = new ArrayList<>();
public ChannelListeners() {}
public ChannelListeners(ChannelListeners<T> other ) {
_channelActions.addAll(other._channelActions);
}
public void add( Action<ValDelegate<T>> action ) {
_getActions( actions -> {
if ( action instanceof WeakAction ) {
WeakAction<?,?> wa = (WeakAction<?,?>) action;
wa.owner().ifPresent( owner -> {
actions.add(action);
WeakReference<ChannelListeners<?>> weakThis = new WeakReference<>(this);
ChannelCleaner cleaner = new ChannelCleaner(weakThis, wa);
ChangeListenerCleaner.getInstance().register(owner, cleaner);
});
}
else
actions.add(action);
});
}
public void removeIf( Predicate<Action<ValDelegate<T>>> predicate ) {
_getActions( actions -> actions.removeIf(predicate) );
}
private synchronized long _getActions(Consumer<List<Action<ValDelegate<T>>>> receiver) {
receiver.accept(_channelActions);
return _channelActions.size();
}
private long numberOfChangeListeners() {
return _getActions( actions -> {} );
}
public void trigger( Channel channel, Val<T> owner ) {
ValDelegate<T> delegate = new ValDelegateImpl<>(channel, Val.ofNullable(owner)); // We clone this property to avoid concurrent modification
_getActions( actions -> {
for ( Action<ValDelegate<T>> action : actions ) // We copy the list to avoid concurrent modification
try {
action.accept(delegate);
} catch ( Exception e ) {
log.error(
"An error occurred while executing " +
"action '"+action+"' for property '"+owner+"'",
e
);
}
});
}
@Override
public final String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName()).append("[");
for ( Action<ValDelegate<T>> action : _channelActions ) {
try {
sb.append(action).append(", ");
} catch ( Exception e ) {
log.error("An error occurred while trying to get the string representation of the action '{}'", action, e);
}
}
sb.append("]");
return sb.toString();
}
}
private static final class ChannelCleaner implements Runnable {
private final WeakReference<ChannelListeners<?>> weakThis;
private final WeakAction<?,?> wa;
private ChannelCleaner(WeakReference<ChannelListeners<?>> weakThis, WeakAction<?, ?> wa) {
this.weakThis = weakThis;
this.wa = wa;
}
@Override
public void run() {
ChannelListeners<?> strongThis = weakThis.get();
if ( strongThis == null )
return;
strongThis._getActions( innerActions -> {
try {
wa.clear();
} catch ( Exception e ) {
log.error(
"An error occurred while clearing the weak action '{}' during the process of " +
"removing it from the list of change actions.", wa, e
);
}
innerActions.remove(wa);
});
}
}
}