我正在开展一个项目,我需要消耗大量的记录,然后我将这些记录发送到使用ZeroMQ的其他系统.

这是流程:

>将所有传入的记录存储在多个线程的CHM中.记录将以非常高的速度进行.
>从每1分钟运行的后台线程将这些记录从CHM发送到ZeroMQ服务器.
>将每个记录发送到ZeroMQ服务器后,将它们添加到重试桶中,以便在特定时间过后,如果尚未收到此记录的确认,则可以重试.
>我们还有一个poller可运行的线程,它收到来自ZeroMQ服务器的确认,告诉这些记录已经收到,所以一旦我得到确认,我从重试桶中删除该记录,以便它不会被重试.
>即使有一些记录发送两次,没关系,但最好还是这样做.

我不知道在我的下面的情况下最好的方法是最小化.

下面是我的Processor类,其中一个.add()方法将被多个线程调用,以线程安全的方式填充dataHolderByPartitionReference CHM.然后,在Processor类的构造函数中,通过调用SendToZeroMQ类,如下所示,启动每隔30秒运行的后台线程,将记录从同一个CHM推送到一组ZeroMQ服务器.

处理器

public class Processor {
  private final scheduledexecutorservice executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final atomicreference<ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new atomicreference<>(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedrate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>()));
      }
    },30,TimeUnit.SECONDS);
  }

  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
        // calling validateAndSend in parallel for each partition (which is map key)
        // generally there will be only 5-6 unique partitions max
  }

  private void validateAndSend(final int partition,final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
        .........
        .........
        SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder);
    }
    // calling again with remaining values
    SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition,final DataHolder holder) {
    // store records in dataHolderByPartitionReference in a thread safe way
  }
}

以下是我的SendToZeroMQ类,它将一条记录发送到一组ZeroMQ服务器,并根据确认传递相应地重试.

>首先,它将向ZeroMQ服务器发送一条记录.
>然后它将添加一个相同的记录retryBucket,这将稍后重试,取决于是否收到确认.
>在同一个类中,我启动了一个后台线程,每1分钟运行一次,再次发送记录,这些记录仍然在重试桶中.
>同样的类也启动ResponsePoller线程,它将永远保持运行,看看哪些记录已被确认(我们之前发送过的),所以一旦记录得到确认,ResponsePoller线程就会从retryBucket中删除这些记录,这样不要重试

SendToZeroMQ

public class SendToZeroMQ {
  // do I need these two scheduledexecutorservice or one is sufficient to start my both the thread?
  private final scheduledexecutorservice executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final scheduledexecutorservice executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final Cache<Long,byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
      .removalListener(RemovalListeners.asynchronous(new CustomListener(),executorService))
      .build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedrate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long,byte[]> entry : retryBucket.asMap().entrySet()) {
          executeAsync(entry.getKey(),entry.getValue());
        }
      }
    },1,TimeUnit.MINUTES);
  }

  public boolean executeAsync(final long address,final byte[] encodedByteArray) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return executeAsync(address,encodedByteArray,liveSockets.get().getSocket());
  }

  public boolean executeAsync(final long address,final byte[] encodedByteArray,final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // add to retry bucket
    retryBucket.put(address,encodedByteArray);
    return sent;
  }

  public boolean executeAsync(final int partition,final Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }         
    Map<Long,byte[]> addresstoencodedByteArray = encode(partition,clientKeyBytesAndProcessBytesHolder);
    long address = addresstoencodedByteArray.entrySet().iterator().next().getKey();
    byte[] encodedByteArray = addresstoencodedByteArray.entrySet().iterator().next().getValue();
    return executeAsync(address,liveSockets.get().getSocket());
  }

  private Map<Long,byte[]> encode(final int partition,byte[]> clientKeyBytesAndProcessBytesHolder) {

    // this address will be unique always
    long address = TestUtils.getAddress();
    Frame frame = new Frame(............);
    byte[] packedByteArray = frame.serialize();
    // this map will always have one entry in it.
    return ImmutableMap.of(address,packedByteArray);
  }

  public void removeFromretryBucket(final long address) {
    retryBucket.invalidate(address);
  }
}

下面是我的ResponsePoller类,它等待所有这些记录的确认,这些记录已经被其他后台线程发送了.如果接收到确认,则从重试桶中删除确认信息,以免重新尝试.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8076;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X",random.nextInt(),random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client,Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second,pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items,10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getAddress(frame.getData());
              // remove from retry bucket since we got the ackNowledgment for this record
              SendToZeroMQ.getInstance().removeFromretryBucket(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

题:

>我从设计的角度来看,设计这个问题的最好办法是什么,所以我所有的逻辑都能无缝地运作?
>我相当肯定有一个更好的方法来设计这个问题,而不是我有什么 – 更好的方法可以是什么?

解决方法

在我看来,只要您使用TCP进行底层通信,就不用担心“应用层”中的数据接收确认.

在这种情况下 – 由于ZeroMQ建立在TCP本身之上,进一步的优化,您不必担心成功的数据传输,只要传输层没有异常(这显然会反弹到您处理案例).

我看到你的问题的方式是 – 你正在运行Kafka消费者线程,它将接收和反弹消息到另一个消息队列(在这种情况下是ZMQ,它正在使用TCP并保证成功的消息传递,或者在较低级别引发异常通信层).

我可以想到的最简单的解决方案是使用线程池,从每个消费者内部,并尝试使用ZMQ发送消息.在任何网络错误的情况下,只要您的应用程序守护程序正在运行,您可以轻松地将该消息集中在以后的消费或日志记录中.

在提出的解决方案中,我假设消息的顺序不在问题空间中.你不是在看复杂的事情.

java – 如果没有收到确认,如何设计发送记录并重试发送的系统?的更多相关文章

  1. iOS:核心图像和多线程应用程序

    我试图以最有效的方式运行一些核心图像过滤器.试图避免内存警告和崩溃,这是我在渲染大图像时得到的.我正在看Apple的核心图像编程指南.关于多线程,它说:“每个线程必须创建自己的CIFilter对象.否则,你的应用程序可能会出现意外行为.”这是什么意思?我实际上是试图在后台线程上运行我的过滤器,所以我可以在主线程上运行HUD(见下文).这在coreImage的上下文中是否有意义?

  2. ios – 多个NSPersistentStoreCoordinator实例可以连接到同一个底层SQLite持久性存储吗?

    我读过的关于在多个线程上使用CoreData的所有内容都讨论了使用共享单个NSPersistentStoreCoordinator的多个NSManagedobjectContext实例.这是理解的,我已经使它在一个应用程序中工作,该应用程序在主线程上使用CoreData来支持UI,并且具有可能需要一段时间才能运行的后台获取操作.问题是NSPersistentStoreCoordinator会对基础

  3. ios – XCode断点应该只挂起当前线程

    我需要调试多线程错误.因此,为了获得生成崩溃的条件,我需要在代码中的特定点停止一个线程,并等待另一个线程到达第二个断点.我现在遇到的问题是,如果一个线程遇到断点,则所有其他线程都被挂起.有没有办法只停止一个线程,让其他线程运行,直到它们到达第二个断点?)其他更有趣的选择:当你点击第一个断点时,你可以进入控制台并写入这应该在该断点处暂停当前上下文中的线程一小时.然后在Xcode中恢复执行.

  4. ios – 在后台线程中写入Realm后,主线程看不到更新的数据

    >清除数据库.>进行API调用以获取新数据.>将从API检索到的数据写入后台线程中的数据库中.>从主线程上的数据库中读取数据并渲染UI.在步骤4中,数据应该是最新数据,但我们没有看到任何数据.解决方法具有runloops的线程上的Realm实例,例如主线程,updatetothelatestversionofthedataintheRealmfile,因为通知被发布到其线程的runloop.在后台

  5. ios – NSURLConnectionLoader线程中的奇怪崩溃

    我们开始看到我们的应用启动时发生的崩溃.我无法重现它,它只发生在少数用户身上.例外情况是:异常类型:EXC_BAD_ACCESS代码:KERN_INVALID_ADDRESS位于0x3250974659崩溃发生在名为com.apple.NSURLConnectionLoader的线程中在调用时–[NSBlockOperationmain]这是该线程的堆栈跟踪:非常感谢任何帮助,以了解可能导致这种崩

  6. ios – 合并子上下文时的NSObjectInaccessbileExceptions

    我尝试手动重现,但失败了.是否有其他可能发生这种情况的情况,是否有处理此类问题的提示?解决方法在创建子上下文时,您可以尝试使用以下行:

  7. ios – 从后台线程调用UIKit时发出警告

    你如何处理项目中的这个问题?

  8. ios – 在SpriteKit中,touchesBegan在与SKScene更新方法相同的线程中运行吗?

    在这里的Apple文档AdvancedSceneProcessing中,它描述了更新方法以及场景的呈现方式,但没有提到何时处理输入.目前尚不清楚它是否与渲染循环位于同一个线程中,或者它是否与它并发.如果我有一个对象,我从SKScene更新方法和touchesBegan方法(在这种情况下是SKSpriteNode)更新,我是否要担心同步对我的对象的两次访问?解决方法所以几天后没有回答我设置了一些实验

  9. ios – 在后台获取中加载UIWebView

    )那么,有一种方法可以在后台加载UIWebView吗?解决方法如果要从用户界面更新元素,则必须在应用程序的主队列(或线程)中访问它们.我建议您在后台继续获取所需的数据,但是当需要更新UIWebView时,请在主线程中进行.你可以这样做:或者您可以创建一个方法来更新UIWebView上的数据,并使用以下方法从后台线程调用它:这将确保您从正确的线程访问UIWebView.希望这可以帮助.

  10. ios – 何时使用Semaphore而不是Dispatch Group?

    我会假设我知道如何使用DispatchGroup,为了解问题,我尝试过:结果–预期–是:为了使用信号量,我实现了:并在viewDidLoad方法中调用它.结果是:从概念上讲,dispachGroup和Semaphore都有同样的目的.老实说,我不熟悉:什么时候使用信号量,尤其是在与dispachGroup合作时–可能–处理问题.我错过了什么部分?

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部