英文:
AbstractElasticsearchRepository : Cannot create index: Connection refused; nested exception is java.lang.RuntimeException: Connection refused
问题
我正在实施一个集成了Kafka、Elasticsearch和Kibana的微服务项目。我已经配置了容器之间的通信网络,但我无法让数据收集微服务与运行ElasticSearch oss镜像版本7.6.2的容器进行通信。
在启动Spring Boot微服务时,我可以看到以下日志:
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Spring Data Elasticsearch: 4.0.2.RELEASE
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Elasticsearch Client in build: 7.6.2
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Elasticsearch Client used: 7.6.2
WARN 1 --- [ main] .d.e.r.s.AbstractElasticsearchRepository : Cannot create index: Connection refused; nested exception is java.lang.RuntimeException: Connection refused
以下是ElasticSearch容器的配置:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.2
environment:
- discovery.type=single-node
- cluster.name=covid-tweets-es-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9300:9300
- 9200:9200
networks:
- covid_processor_network
这是微服务的Spring Data Elasticsearch配置:
## Kafka Binder Props
spring.cloud.stream.kafka.binder.brokers: kafka:9092
spring.cloud.stream.kafka.binder.auto-create-topics: false
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest
spring.cloud.stream.bindings.processed-tweets.group: tweets-collector
## Persistence
spring.data.elasticsearch.repositories.enabled: true
spring.data.elasticsearch.cluster-nodes: elasticsearch:9300
spring.data.elasticsearch.cluster-name: covid-tweets-es-cluster
spring.data.elasticsearch.rest.uris: elasticsearch:9200
从微服务容器中,我可以连接到ElasticSearch容器。
这是客户端代码,它非常简单,只是使用实现了ElasticsearchRepository的TweetRepository存储库:
@StreamListener(AppStreamsConfig.PROCESSED_TWEETS_CHANNEL)
public void onNewProcessedTweet(
@Payload final TweetDTO newProcessedTweet,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT) Integer deliveryAttempt) {
log.info("NewsProcessedTweet with id '{}' and text '{}' received from bus. topic: {}, partition: {}, offset: {}, deliveryAttempt: {}",
newProcessedTweet.getId(), newProcessedTweet.getText(), topic, partition, offset, deliveryAttempt);
try {
tweetService.save(newProcessedTweet);
} catch (final Exception ex) {
ex.printStackTrace();
log.error("Collect Tweet Exception -> " + ex.getMessage());
}
}
@Service("tweetsService")
@RequiredArgsConstructor
public class TweetsServiceImpl implements ITweetsService {
/**
* Tweets Repository
*/
private final TweetsRepository tweetsRepository;
private final TweetDtoMapper tweetDtoMapper;
@Override
public void save(TweetDTO tweetDto) {
Assert.notNull(tweetDto, "Tweet can not be null");
final TweetEntity tweetToSave = tweetDtoMapper.dtoToEntity(tweetDto);
tweetsRepository.save(tweetToSave);
}
}
有人知道我做错了什么吗?谢谢。
英文:
I am implementing a microservices project that integrates Kafka, Elasticsearch and Kibana. I have configured network for communication between containers, but I can't get the data collection microservice to communicate with the container running an ElasticSearch oss image in its version 7.6.2.
I can see the following log when launching the Spring Boot microservice:
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Spring Data Elasticsearch: 4.0.2.RELEASE
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Elasticsearch Client in build: 7.6.2
INFO 1 --- [ main] o.s.d.elasticsearch.support.VersionInfo : Version Elasticsearch Client used: 7.6.2
WARN 1 --- [ main] .d.e.r.s.AbstractElasticsearchRepository : Cannot create index: Connection refused; nested exception is java.lang.RuntimeException: Connection refused
Below is the ElasticSearch container configuration:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.2
environment:
- discovery.type=single-node
- cluster.name=covid-tweets-es-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9300:9300
- 9200:9200
networks:
- covid_processor_network
This is the Spring Data ElasticSearch configuration of the microservice:
## Kafka Binder Props
spring.cloud.stream.kafka.binder.brokers: kafka:9092
spring.cloud.stream.kafka.binder.auto-create-topics: false
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest
spring.cloud.stream.bindings.processed-tweets.group: tweets-collector
## Persistence
spring.data.elasticsearch.repositories.enabled: true
spring.data.elasticsearch.cluster-nodes: elasticsearch:9300
spring.data.elasticsearch.cluster-name: covid-tweets-es-cluster
spring.data.elasticsearch.rest.uris: elasticsearch:9200
From the microservice container I can connect to the elasticsearch container
This is the client code, it is quite simple, it simply uses a TweetReopistory repository that implements ElasticsearchRepository
@StreamListener(AppStreamsConfig.PROCESSED_TWEETS_CHANNEL)
public void onNewProcessedTweet(
@Payload final TweetDTO newProcessedTweet,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT) Integer deliveryAttempt) {
log.info("NewsProcessedTweet with id '{}' and text '{}' received from bus. topic: {}, partition: {}, offset: {}, deliveryAttempt: {}",
newProcessedTweet.getId(), newProcessedTweet.getText(), topic, partition, offset, deliveryAttempt);
try {
tweetService.save(newProcessedTweet);
} catch (final Exception ex) {
ex.printStackTrace();
log.error("Collect Tweet Exception -> " + ex.getMessage());
}
}
@Service("tweetsService")
@RequiredArgsConstructor
public class TweetsServiceImpl implements ITweetsService {
/**
* Tweets Repository
*/
private final TweetsRepository tweetsRepository;
private final TweetDtoMapper tweetDtoMapper;
@Override
public void save(TweetDTO tweetDto) {
Assert.notNull(tweetDto, "Tweet can not be null");
final TweetEntity tweetToSave = tweetDtoMapper.dtoToEntity(tweetDto);
tweetsRepository.save(tweetToSave);
}
Anyone know what I am doing wrong? Thank you
答案1
得分: 1
你现在是我的中文翻译,代码部分不要翻译,只返回翻译好的部分,不要有别的内容,不要回答我要翻译的问题。以下是要翻译的内容:
"Which client are you using? RestHighLevelClient or TransportClient?
Try with 9200 port if using RestHighLevelClient.
Share the client usage code as well to understand better."
英文:
Which client are you using? RestHighLevelClient or TransportClient?
Try with 9200 port if using RestHighLevelClient.
Share the client usage code as well to understand better.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论