如何通过JMS缓存连接工厂增加插入到MQ的线程/任务数量?

huangapple go评论62阅读模式
英文:

how to increase the number of threads/task inserting into MQ using JMS caching connection factory?

问题

我有一个Kafka消费者,有10个消费者线程从10个分区中消费数据,然后使用JMS模板缓存连接工厂(SessionCacheSize(10))将其插入到消息队列中。消息队列插入部分是异步的。因此,Kafka消费者部分和消息队列插入部分都是以异步方式进行的。但是,我在这里遇到的问题是,我的Kafka消费者层能够在1分钟内消费10万个事件,而在将数据插入消息队列时,每次只有8个线程或任务在运行。我们需要帮助增加插入消息队列的线程和实例的数量。

以下是我的JMSTemplate缓存连接工厂代码,将由另一个方法调用来插入消息队列。

@Configuration
public class MQutil {

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        try{
            System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
            // 创建具有MQ属性的MQConnectionFactory
            MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
            mqConnectionFactory.setHostName("ca");
            mqConnectionFactory.setPort(1414);
            mqConnectionFactory.setQueueManager("QMGR");
            mqConnectionFactory.setChannel("CHANNEL");
            mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);

            UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
            connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
            connectionFactoryAdapter.setUsername("dmin");
            connectionFactoryAdapter.setPassword("21ff");

            cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
            cachingConnectionFactory.setSessionCacheSize(10);

        } catch (Exception e) {
            System.out.println("Exception Inserting Message Into MQ " + e);
        }
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE");
        return jmsTemplate;
    }
}

以下方法调用JMSTemplate来插入消息队列。

@Async
public void insertIntoMQ(String kafkaMessage) {
    try{
        jmsTemplate.convertAndSend(kafkaMessage); // 如果要使用默认目标名称,请使用此方法
        log.info("Message Inserted Successfully :\n" + kafkaMessage);
    } catch (Exception e) {
        log.error("Exception Inserting Message Into MQ ", e);
    }
}

以下是我的Kafka消费者配置:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

需要帮助增加插入消息队列的线程和任务的数量。

英文:

I have a Kafka consumer with 10 consumer threads consuming from 10 partitions and inserting into MQ using the JMS template caching connection factory (SessionCacheSize(10)). The MQ insertion part is ASYNC here. So, the Kafka consumer part and MQ insertion part happen in ASYNC mode, but what I'm experiencing here is that my Kafka consumer layer is able to consume 100k events within 1 min, whereas only 8 threads or tasks are running at any given point in time when inserting into MQ. We need help increasing the number of threads and instances inserting into MQ.

below is my JMSTemplate caching connection factory code which will be called by another method to insert into MQ.

@Configuration
public class MQutil {

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        try{
            System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
            // Create an MQConnectionFactory with the MQ properties
            MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
            mqConnectionFactory.setHostName("ca");
            mqConnectionFactory.setPort(1414);
            mqConnectionFactory.setQueueManager("QMGR");
            mqConnectionFactory.setChannel("CHANNEL");
            mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);

            UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
            connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
            connectionFactoryAdapter.setUsername("dmin");
            connectionFactoryAdapter.setPassword("21ff");

            cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
            cachingConnectionFactory.setSessionCacheSize(10);

        } catch (Exception e) {

            System.out.println("Exception Inserting Message Into MQ "+ e);
        }
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE");
        return jmsTemplate;
    }
}

the below method calls the JMSTemplate

@Async
    public void insertIntoMQ(String kafkaMessage) {
        //log.info("Inside insertIntoMQ function");
        try{
 
            jmsTemplate.convertAndSend(kafkaMessage); // use this if you want to use the default destination name
            log.info("Message Inserted Successfully :\n" + kafkaMessage);
        } catch (Exception e) {

            log.error("Exception Inserting Message Into MQ ", e);
        }
    }

below are my Kafka consumer configuration

Properties config = new Properties();
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
		config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
		 config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		 config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

		config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

Need help to increase the number of threads/tasks inserting into MQ.

答案1

得分: 0

I have updated the code to perform synchronously instead of asynchronously, which has increased the number of threads from 8 tasks to 10 tasks, matching the number of consumer stream threads. Additionally, I improved performance by configuring certain settings, and the achieved throughput is 850 TPS.

@Configuration
public class MQutil {
    
    @Bean
    public QosSettings qos_Settings() {
        QosSettings qosSettings = new QosSettings();
        qosSettings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        return qosSettings;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        try{
            
            System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
            // Create an MQConnectionFactory with the MQ properties
            MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
            mqConnectionFactory.setHostName("cdopla-854.bell.corp.bce.ca");
            mqConnectionFactory.setPort(1414);
            mqConnectionFactory.setQueueManager("ESBUATQMGR");
            mqConnectionFactory.setChannel("ESIDE.CHANNEL");
            mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);

            UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
            connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
            connectionFactoryAdapter.setUsername("esideadmin");
            connectionFactoryAdapter.setPassword("esbcan21ff");

            cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
            cachingConnectionFactory.setSessionCacheSize(10);

        } catch (Exception e) {

            System.out.println("Exception Inserting Message Into MQ "+ e);
        }
        return cachingConnectionFactory;
    }
    

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE2");
        jmsTemplate.setExplicitQosEnabled(true);
        //jmsTemplate.setQosSettings(qos_Settings()); -- increase performance double-fold by getting acknowledgment at the main memory state instead of disk storage persist state
        jmsTemplate.setSessionTransacted(false);
        jmsTemplate.setSessionAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
        return jmsTemplate;
    }
}
英文:

I have updated the Code to perform synchronously instead of Async which helps in getting thosee 2 remaining threads from 8 Task to 10 Task which is same as my no of consumer stream threads and additionaly , increased performance by using few configs like below and was able to achieve 850 TPS.

@Configuration

public class MQutil {

@Bean
public QosSettings qos_Settings() {
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return qosSettings;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
try{
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
// Create an MQConnectionFactory with the MQ properties
MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
mqConnectionFactory.setHostName("cdopla-854.bell.corp.bce.ca");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setQueueManager("ESBUATQMGR");
mqConnectionFactory.setChannel("ESIDE.CHANNEL");
mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
connectionFactoryAdapter.setUsername("esideadmin");
connectionFactoryAdapter.setPassword("esbcan21ff");
cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(10);
} catch (Exception e) {
System.out.println("Exception Inserting Message Into MQ "+ e);
}
return cachingConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE2");
jmsTemplate.setExplicitQosEnabled(true);
//jmsTemplate.setQosSettings(qos_Settings()); -- increase performace double fold by getting ack at main mem state instead of disk storage persist state
jmsTemplate.setSessionTransacted(false);
jmsTemplate.setSessionAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
return jmsTemplate;
}

}

huangapple
  • 本文由 发表于 2023年5月10日 20:26:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76218415.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定