Code Review Asked by Bruno Thomas on January 2, 2021
We want to provide a simple Redis pub/sub alternative in memory for our software. So we implemented this :
public class MemoryDataBus implements DataBus {
private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap<>();
@Override
public void publish(final Channel channel, final Message message) {
Message nonNullMessage = requireNonNull(message, "cannot publish a null message");
subscribers.values().stream().filter(l -> l.hasSubscribedTo(channel)).forEach(l -> l.accept(nonNullMessage));
}
@Override
public void subscribe(final Consumer<Message> subscriber, final Channel... channels) throws InterruptedException {
MessageListener listener = new MessageListener(subscriber, channels);
subscribers.put(subscriber, listener);
listener.loopUntilShutdown();
}
@Override
public void unsubscribe(Consumer<Message> subscriber) {
ofNullable(subscribers.remove(subscriber)).ifPresent(l -> {
l.accept(new ShutdownMessage());
});
}
private static class MessageListener implements Consumer<Message> {
private final Consumer<Message> subscriber;
private final LinkedHashSet<Channel> channels;
final AtomicReference<Message> message = new AtomicReference<>();
public MessageListener(Consumer<Message> subscriber, Channel... channels) {
this.subscriber = subscriber;
this.channels = asSet(channels);
}
boolean hasSubscribedTo(Channel channel) {
return channels.contains(channel);
}
@Override
public void accept(Message message) {
subscriber.accept(message);
synchronized (this.message) {
this.message.set(message);
this.message.notify();
}
}
boolean shutdownAsked() {
Message message = this.message.get();
return message != null && message.type == Message.Type.SHUTDOWN;
}
void loopUntilShutdown() throws InterruptedException {
synchronized (message) {
while (!shutdownAsked()) {
message.wait();
}
}
}
}
}
I’ve removed unnecessary code that brings some noise (logs, couters) the source code is here.
Our unit tests are green, and manual testing shows no regression compared to redis. But as we also know how difficult it is to make a correct thread safe implementation we want to verify with threading experts :
PS : must add that the question has been originally posted on SO.
Assuming requireNonNull(...)
is java.util.Objects.requireNonNull(...)
which throws an exception if null
, then there is no need to have a separate local variable for the return value.
Assuming ofNullable(...)
is java.util.Optional.ofNullable(...)
, then I would recommend removing the usage here because the code can be simplified:
// ofNullable(...).ifPresent(l -> ...);
// Simplified:
MyClass result = ...
if (result != null) {
...
}
It looks like your locking in MessageListener
will cause a deadlock:
MemoryDataBus.subscribe(...)
calls loopUntilShutdown
which synchronizes on message
MemoryDataBus.publish(...)
calls accept
which has to wait because lock on message
is still held, preventing it from updating message
Edit: No deadlock occurs because Object.wait()
is used which releases ownership of the monitor (here field message
).
Answered by Marcono1234 on January 2, 2021
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP