英文:
Spring Cloud Stream Source - Not Pulling
问题
这是你提供的内容的翻译:
我正在尝试在Spring Cloud Dataflow中为概念验证开发自定义的数据源(Source)。
我成功地进行了部署,但似乎bean没有被拉取。
这是父级 pom.xml 的一部分:
...
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
...
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
...
这是项目的 pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.4.RELEASE</version>
<configuration>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
<version>2.0.2.RELEASE</version>
<executions>
<execution>
<id>aggregate-metadata</id>
<phase>compile</phase>
<goals>
<goal>aggregate-metadata</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
SourceApplication.java
@SpringBootApplication
@EnableBinding(Source.class)
@EnableConfigurationProperties(ReportingProperties.class)
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
ReportingProperties.java
@Validated
@ConfigurationProperties("reporting-properties")
public class ReportingProperties {
/**
* 报告的起始日期。
*/
private LocalDateTime fromDate = LocalDateTime.now().minusDays(1000);
/**
* 报告的结束日期。
*/
private LocalDateTime toDate = LocalDateTime.now();
public LocalDateTime getFromDate() {
return fromDate;
}
public ReportingProperties setFromDate(LocalDateTime fromDate) {
this.fromDate = fromDate;
return this;
}
public LocalDateTime getToDate() {
return toDate;
}
public ReportingProperties setToDate(LocalDateTime toDate) {
this.toDate = toDate;
return this;
}
}
最后是服务代码:
@Configuration
@EnableBinding(Source.class)
public class PullUsersService {
@Bean
@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)
public Supplier<String> pullUsers() {
return () -> "Test";
}
}
我想知道如何触发拉取机制,以便在部署后可以在日志中看到 "Test"(我相信在SCDF上一切都设置正确,如果我执行 "time | log",我可以在日志中看到一些结果,但如果我执行 "myservice | log",没有任何内容显示)。
我做错了什么?(也许我的代码中有一些冗余)
英文:
I'm trying to develop a customized Source for a proof of concept in Spring Cloud Dataflow.
I managed to deploy it correctly, but it seems that the bean is not pulled.
Here's a part of the parent pom.xml
...
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
...
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
...
Here's the project pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.4.RELEASE</version>
<configuration>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
<version>2.0.2.RELEASE</version>
<executions>
<execution>
<id>aggregate-metadata</id>
<phase>compile</phase>
<goals>
<goal>aggregate-metadata</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
SourceApplication.java
@SpringBootApplication
@EnableBinding(Source.class)
@EnableConfigurationProperties(ReportingProperties.class)
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
ReportingProperties.java
@Validated
@ConfigurationProperties("reporting-properties")
public class ReportingProperties {
/**
* The starting date of the reporting.
*/
private LocalDateTime fromDate = LocalDateTime.now().minusDays(1000);
/**
* The end date of the reporting.
*/
private LocalDateTime toDate = LocalDateTime.now();
public LocalDateTime getFromDate() {
return fromDate;
}
public ReportingProperties setFromDate(LocalDateTime fromDate) {
this.fromDate = fromDate;
return this;
}
public LocalDateTime getToDate() {
return toDate;
}
public ReportingProperties setToDate(LocalDateTime toDate) {
this.toDate = toDate;
return this;
}
}
And finally the service :
@Configuration
@EnableBinding(Source.class)
public class PullUsersService {
@Bean
@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)
public Supplier<String> pullUsers() {
return () -> "Test";
}
}
I'm wondering how to trigger the pulling mechanism so when deployed I can see "Test" in the logs
(I believe everything is setup correctly on SCDF, if I do "time | log" i can see some results in the log, but if I do "myservice | log" nothing appears.
What am I doing wrong ? (maybe there's some redundancy in my code)
答案1
得分: 1
以下是翻译好的部分:
注意:删除所有的 @EnableBinding(Source.class) 注解
@Component
public class PullUsersService {
@PollableBean
public Supplier<String> pullUsers() {
return () -> "Test";
}
}
根据您的绑定器配置,如果您想要将队列添加到 RabbitMQ 或 Kafka 中,请添加以下配置:
spring.cloud.function.definition=pullUsers
spring.cloud.stream.bindings.pullUsers-out-0.destination=users
spring.cloud.stream.bindings.pullUsers-out-0.group=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups 配置会强制生产者创建队列。
英文:
The answer above is correct, you can also do it the following way:
Note: Remove all @EnableBinding(Source.class) anotations
@Component
public class PullUsersService {
@PollableBean
public Supplier<String> pullUsers() {
return () -> "Test";
}
}
Add the following configurations if you want queue to rabbitMq or kafka based on your binder configuration
spring.cloud.function.definition=pullUsers
spring.cloud.stream.bindings.pullUsers-out-0.destination=users
spring.cloud.stream.bindings.pullUsers-out-0.group=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups configuration forces producer to create the queues
答案2
得分: 0
这很有趣,是什么促使你使用了这个:
@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)
如果你查看你提到的那个time
源代码,你会看到类似这样的内容:
@PollableSource
public String publishTime() {
return new SimpleDateFormat(this.triggerProperties.getDateFormat()).format(new Date());
}
考虑使用@PollableSource
替代,并且不再使用Supplier
。
关键是你目前的所有注解都无法进行轮询。
@Publisher
只在我们调用该方法时起作用。
@SendTo
在这里完全被忽略,因为@Publisher
执行的正是相同的操作,它“发送至”。
英文:
That's interesting what made you to use this:
@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)
If you take a look into that time
source code you mention, you'll see something like this:
@PollableSource
public String publishTime() {
return new SimpleDateFormat(this.triggerProperties.getDateFormat()).format(new Date());
}
Consider to use that @PollableSource
instead and don't use a Supplier
.
The point is that all your current annotations do nothing with polling.
The @Publisher
works only when we call the method.
The @SendTo
is fully ignored here since @Publisher
does exactly the same and it "sends to".
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论