I'm trying to implement a producer-consumer model by using Hazelcast.
The producer puts an item to queue and the consumer consumes it using take() method.
I close the consumer application and start again. The consumer retrieves the previously consumed item from the queue.
I tried the Hazelcast Ringbuffer and I see the same behavior.
Is there a way to force to remove the consumed item from the queue in Hazelcast?
Thanks in advance
Producer.java:
public class Producer implements MembershipListener {
private HazelcastInstance hzInstance;
private Cluster cluster;
private IAtomicLong counter;
private IQueue<Data> dataQueue;
private IMap<String, List<Data>> dataByConsumerId;
public static void main(String[] args) {
Producer producer = new Producer();
Scanner scanIn = new Scanner(System.in);
while (true) {
String cmd = scanIn.nextLine();
if (cmd.equals("QUIT")) {
break;
} else if (cmd.equals("ADD")) {
long x = producer.counter.addAndGet(1);
producer.dataQueue.add(new Data(x, x + 1));
}
}
scanIn.close();
}
public Producer() {
hzInstance = Hazelcast.newHazelcastInstance(configuration());
counter = hzInstance.getCPSubsystem().getAtomicLong("COUNTER");
dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
dataQueue = hzInstance.getQueue("DATA_QUEUE");
cluster = hzInstance.getCluster();
cluster.addMembershipListener(this);
}
public Config configuration() {
Config config = new Config();
config.setInstanceName("hazelcast-instance");
MapConfig mapConfig = new MapConfig();
mapConfig.setName("configuration");
mapConfig.setTimeToLiveSeconds(-1);
config.addMapConfig(mapConfig);
return config;
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
String removedConsumerId = membershipEvent.getMember().getUuid().toString();
List<Data> items = dataByConsumerId.remove(removedConsumerId);
if (items == null)
return;
items.forEach(item -> {
System.out.println("Push data to recover :" + item.toString());
dataQueue.add(item);
});
}
}
Consumer.java:
public class Consumer {
private String id;
private HazelcastInstance hzInstance;
private IMap<String, List<Data>> dataByConsumerId;
private IQueue<Data> dataQueue;
public Consumer() {
hzInstance = Hazelcast.newHazelcastInstance(configuration());
id = hzInstance.getLocalEndpoint().getUuid().toString();
dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
dataByConsumerId.put(id, new ArrayList<Data>());
dataQueue = hzInstance.getQueue("DATA_QUEUE");
}
public Config configuration() {
Config config = new Config();
config.setInstanceName("hazelcast-instance");
MapConfig mapConfig = new MapConfig();
mapConfig.setName("configuration");
mapConfig.setTimeToLiveSeconds(-1);
config.addMapConfig(mapConfig);
return config;
}
public static void main(String[] args) {
Consumer consumer = new Consumer();
try {
consumer.run();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private void run() {
while (true) {
System.out.println("Take queue item...");
try {
var item = dataQueue.take();
System.out.println("New item taken:" + item.toString());
var dataInCluster = dataByConsumerId.get(id);
dataInCluster.add(item);
dataByConsumerId.put(id, dataInCluster);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Data.java:
public class Data implements Serializable {
private static final long serialVersionUID = -975095628505008933L;
private long x, y;
public Data(long x, long y) {
super();
this.x = x;
this.y = y;
}
public long getX() {
return x;
}
public void setX(long x) {
this.x = x;
}
public long getY() {
return y;
}
public void setY(long y) {
this.y = y;
}
@Override
public String toString() {
return "Data [x=" + x + ", y=" + y + "]";
}
}
question from:
https://stackoverflow.com/questions/65643827/removing-consumed-item-from-the-queue-in-hazelcast 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…