如何确保我的 Apache Ignite 2.x 分布式缓存的 put 操作是异步的

huangapple go评论63阅读模式
英文:

How do I make sure my apache Ignite 2.x distributed cache puts are asynchornous

问题

以下是使用Apache Ignite的分布式缓存示例。您想要使缓存放入操作异步化,即使您执行cache.put(i, new X12File("x12file" + i, LocalDateTime.now().toString()));,该操作也应该非常快速,并且将数据推送到集群的其余部分应该在后台进行,不会给用户带来不便。您需要实现"fire and forget"功能。

我在文档中找不到异步选项。有人可以帮助我理解如何更改此代码,以使我的放入操作尽可能地"fire and forget"吗?谢谢!

private static final String ORG_CACHE = IgniteCache.class.getSimpleName() + "Organizations";

public static void main(String[] args) throws Exception {
    DataRegionConfiguration dfltDataRegConf = new DataRegionConfiguration();
    dfltDataRegConf.setPersistenceEnabled(true);

    DataStorageConfiguration dsCfg = new DataStorageConfiguration();
    dsCfg.setDefaultDataRegionConfiguration(dfltDataRegConf);
    dsCfg.setStoragePath("/home/kazakov/tmp");

    IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
    igniteConfiguration.setDataStorageConfiguration(dsCfg);

    TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
    tcpDiscoverySpi.setIpFinder(ipFinder);

    igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);

    try(Ignite ignite = Ignition.start(igniteConfiguration)) {
        ignite.active(true);

        CacheConfiguration<Long, X12File> cacheCfg = new CacheConfiguration<>(ORG_CACHE);

        cacheCfg.setCacheMode(CacheMode.REPLICATED);
        cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
        cacheCfg.setBackups(1);
        cacheCfg.setOnheapCacheEnabled(true);

        IgniteCache<Long, X12File> cache = ignite.getOrCreateCache(cacheCfg).withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 1)));

        for (long i = 0; i < 4_000_000; i++) {
            if (i > 0 && i % 10_000 == 0)
                System.out.println("Done: " + i);

            // 这里需要进行异步放入操作,确保不会阻塞用户界面
            cache.putAsync(i, new X12File("x12file" + i, LocalDateTime.now().toString()));
        }

        Thread.sleep(5000);

        int matches = 0;
        for (long i = 0; i < 4_000_000; i++) {
            if (cache.get(i) != null)
                ++matches;
        }
        System.out.println("Matches: " + matches);
    }
}

请注意,我已将cache.put改为cache.putAsync,以确保放入操作是异步的,不会阻塞用户界面。

英文:

Below I have a distributed cache example using apache ignite.

I want to make it so when I do a cache put operation: cache.put(i, new X12File(&quot;x12file&quot; + i, LocalDateTime.now().toString())); that it is completely asynchronous. Meaning my put operation should be super fast, and the pushing to the rest of the cluster should happen in the background not inconveniencing the user. I need "fire and forget" functionality.

I am struggling to find an async option in the documentation. Can someone help me understand how to change this code to make my puts as "fire and forget" as possible? Thanks!

private static final String ORG_CACHE = IgniteCache.class.getSimpleName() + &quot;Organizations&quot;;
public static void main(String[] args) throws Exception {
DataRegionConfiguration dfltDataRegConf = new DataRegionConfiguration();
dfltDataRegConf.setPersistenceEnabled(true);
DataStorageConfiguration dsCfg = new DataStorageConfiguration();
dsCfg.setDefaultDataRegionConfiguration(dfltDataRegConf);
dsCfg.setStoragePath(&quot;/home/kazakov/tmp&quot;);
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setDataStorageConfiguration(dsCfg);
TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList(&quot;127.0.0.1:47500..47509&quot;));
tcpDiscoverySpi.setIpFinder(ipFinder);
igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
try(Ignite ignite = Ignition.start(igniteConfiguration)) {
ignite.active(true);
CacheConfiguration&lt;Long, X12File&gt; cacheCfg = new CacheConfiguration&lt;&gt;(ORG_CACHE);
cacheCfg.setCacheMode(CacheMode.REPLICATED);
cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cacheCfg.setBackups(1);
cacheCfg.setOnheapCacheEnabled(true);
IgniteCache&lt;Long, X12File&gt; cache = ignite.getOrCreateCache(cacheCfg).withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 1)));
for (long i = 0; i &lt; 4_000_000; i++) {
if (i &gt; 0 &amp;&amp; i % 10_000 == 0)
System.out.println(&quot;Done: &quot; + i);
cache.put(i, new X12File(&quot;x12file&quot; + i, LocalDateTime.now().toString()));
}
Thread.sleep(5000);
int matches = 0;
for (long i = 0; i &lt; 4_000_000; i++) {
if (cache.get(i) != null)
++matches;
}
System.out.println(&quot;Matches: &quot; + matches);
}
}

答案1

得分: 3

查看 IgniteCache#putAsync(..)

参见:https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteCache.html#putAsync-K-V-

英文:

Check out IgniteCache#putAsync(..)

see: https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteCache.html#putAsync-K-V-

huangapple
  • 本文由 发表于 2020年10月23日 05:26:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/64490772.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定