Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
89 views
in Technique[技术] by (71.8m points)

Removing consumed item from the queue in Hazelcast

I'm trying to implement a producer-consumer model by using Hazelcast.
The producer puts an item to queue and the consumer consumes it using take() method.
I close the consumer application and start again. The consumer retrieves the previously consumed item from the queue.
I tried the Hazelcast Ringbuffer and I see the same behavior.
Is there a way to force to remove the consumed item from the queue in Hazelcast?
Thanks in advance

Producer.java:

public class Producer implements MembershipListener {
  private HazelcastInstance hzInstance;

  private Cluster cluster;

  private IAtomicLong counter;

  private IQueue<Data> dataQueue;

  private IMap<String, List<Data>> dataByConsumerId;

  public static void main(String[] args) {
    Producer producer = new Producer();

    Scanner scanIn = new Scanner(System.in);
    while (true) {

      String cmd = scanIn.nextLine();
      if (cmd.equals("QUIT")) {
        break;
      } else if (cmd.equals("ADD")) {
        long x = producer.counter.addAndGet(1);
        producer.dataQueue.add(new Data(x, x + 1));

      }
    }
    scanIn.close();

  }

  public Producer() {
    hzInstance = Hazelcast.newHazelcastInstance(configuration());

    counter = hzInstance.getCPSubsystem().getAtomicLong("COUNTER");

    dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");

    dataQueue = hzInstance.getQueue("DATA_QUEUE");

    cluster = hzInstance.getCluster();
    cluster.addMembershipListener(this);
  }

  public Config configuration() {
    Config config = new Config();
    config.setInstanceName("hazelcast-instance");
    MapConfig mapConfig = new MapConfig();
    mapConfig.setName("configuration");
    mapConfig.setTimeToLiveSeconds(-1);
    config.addMapConfig(mapConfig);
    return config;
  }

  @Override
  public void memberAdded(MembershipEvent membershipEvent) {

  }

  @Override
  public void memberRemoved(MembershipEvent membershipEvent) {
    String removedConsumerId = membershipEvent.getMember().getUuid().toString();
    List<Data> items = dataByConsumerId.remove(removedConsumerId);
    if (items == null)
      return;
    items.forEach(item -> {
      System.out.println("Push data to recover :" + item.toString());
      dataQueue.add(item);
    });

  }

}

Consumer.java:

public class Consumer {
  private String id;
  private HazelcastInstance hzInstance;
  private IMap<String, List<Data>> dataByConsumerId;
  private IQueue<Data> dataQueue;
  
  public Consumer() {
    hzInstance = Hazelcast.newHazelcastInstance(configuration());
    id = hzInstance.getLocalEndpoint().getUuid().toString();
    dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
    dataByConsumerId.put(id, new ArrayList<Data>());
    dataQueue = hzInstance.getQueue("DATA_QUEUE");
  }

  public Config configuration() {
    Config config = new Config();
    config.setInstanceName("hazelcast-instance");
    MapConfig mapConfig = new MapConfig();
    mapConfig.setName("configuration");
    mapConfig.setTimeToLiveSeconds(-1);
    config.addMapConfig(mapConfig);
    return config;
  }
  
  public static void main(String[] args) {
    Consumer consumer = new Consumer();
    try {
      consumer.run();
      System.in.read();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  
  private void run() {
    while (true) {

      System.out.println("Take queue item...");
      try {
        var item = dataQueue.take();
        System.out.println("New item taken:" + item.toString());
        var dataInCluster = dataByConsumerId.get(id);
        dataInCluster.add(item);
        dataByConsumerId.put(id, dataInCluster);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

Data.java:

public class Data implements Serializable {

  private static final long serialVersionUID = -975095628505008933L;

  private long x, y;

  public Data(long x, long y) {
    super();
    this.x = x;
    this.y = y;
  }

  public long getX() {
    return x;
  }

  public void setX(long x) {
    this.x = x;
  }

  public long getY() {
    return y;
  }

  public void setY(long y) {
    this.y = y;
  }

  @Override
  public String toString() {
    return "Data [x=" + x + ", y=" + y + "]";
  }

}
question from:https://stackoverflow.com/questions/65643827/removing-consumed-item-from-the-queue-in-hazelcast

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...