春季云流源 – 未拉取

huangapple go评论78阅读模式
英文:

Spring Cloud Stream Source - Not Pulling

问题

这是你提供的内容的翻译:

我正在尝试在Spring Cloud Dataflow中为概念验证开发自定义的数据源(Source)。

我成功地进行了部署,但似乎bean没有被拉取。

这是父级 pom.xml 的一部分:

...
<properties>
  <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
  ...
</properties>

<dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
</dependencyManagement>
...

这是项目的 pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-cloud-connectors</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.3.4.RELEASE</version>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-configuration-processor</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
            <version>2.0.2.RELEASE</version>
            <executions>
                <execution>
                    <id>aggregate-metadata</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>aggregate-metadata</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

SourceApplication.java

@SpringBootApplication
@EnableBinding(Source.class)
@EnableConfigurationProperties(ReportingProperties.class)
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }
}

ReportingProperties.java

@Validated
@ConfigurationProperties("reporting-properties")
public class ReportingProperties {

    /**
     * 报告的起始日期。
     */
    private LocalDateTime fromDate = LocalDateTime.now().minusDays(1000);

    /**
     * 报告的结束日期。
     */
    private LocalDateTime toDate = LocalDateTime.now();

    public LocalDateTime getFromDate() {
        return fromDate;
    }

    public ReportingProperties setFromDate(LocalDateTime fromDate) {
        this.fromDate = fromDate;
        return this;
    }

    public LocalDateTime getToDate() {
        return toDate;
    }

    public ReportingProperties setToDate(LocalDateTime toDate) {
        this.toDate = toDate;
        return this;
    }
}

最后是服务代码:

@Configuration
@EnableBinding(Source.class)
public class PullUsersService {

    @Bean
    @Publisher(channel = Source.OUTPUT)
    @SendTo(Source.OUTPUT)
    public Supplier<String> pullUsers() {
        return () -> "Test";
    }

}

我想知道如何触发拉取机制,以便在部署后可以在日志中看到 "Test"(我相信在SCDF上一切都设置正确,如果我执行 "time | log",我可以在日志中看到一些结果,但如果我执行 "myservice | log",没有任何内容显示)。

我做错了什么?(也许我的代码中有一些冗余)

英文:

I'm trying to develop a customized Source for a proof of concept in Spring Cloud Dataflow.

I managed to deploy it correctly, but it seems that the bean is not pulled.

Here's a part of the parent pom.xml

...
&lt;properties&gt;
  &lt;spring-cloud.version&gt;Hoxton.SR8&lt;/spring-cloud.version&gt;
  ...
&lt;/properties&gt;

&lt;dependencyManagement&gt;
    &lt;dependencies&gt;
      &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
        &lt;artifactId&gt;spring-cloud-dependencies&lt;/artifactId&gt;
        &lt;version&gt;${spring-cloud.version}&lt;/version&gt;
        &lt;type&gt;pom&lt;/type&gt;
        &lt;scope&gt;import&lt;/scope&gt;
      &lt;/dependency&gt;
    &lt;/dependencies&gt;
&lt;/dependencyManagement&gt;
...

Here's the project pom.xml

&lt;dependencies&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
        &lt;artifactId&gt;spring-cloud-stream&lt;/artifactId&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
        &lt;artifactId&gt;spring-boot-starter-actuator&lt;/artifactId&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
        &lt;artifactId&gt;spring-cloud-stream-binder-kafka&lt;/artifactId&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
        &lt;artifactId&gt;spring-boot-starter-cloud-connectors&lt;/artifactId&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
        &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
        &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
        &lt;version&gt;2.3.10.RELEASE&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
        &lt;artifactId&gt;spring-boot-configuration-processor&lt;/artifactId&gt;
        &lt;optional&gt;true&lt;/optional&gt;
    &lt;/dependency&gt;
&lt;/dependencies&gt;

&lt;build&gt;
    &lt;plugins&gt;
        &lt;plugin&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-maven-plugin&lt;/artifactId&gt;
            &lt;version&gt;2.3.4.RELEASE&lt;/version&gt;
            &lt;configuration&gt;
                &lt;excludes&gt;
                    &lt;exclude&gt;
                        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
                        &lt;artifactId&gt;spring-boot-configuration-processor&lt;/artifactId&gt;
                    &lt;/exclude&gt;
                &lt;/excludes&gt;
            &lt;/configuration&gt;
        &lt;/plugin&gt;
        &lt;plugin&gt;
            &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
            &lt;artifactId&gt;spring-cloud-app-starter-metadata-maven-plugin&lt;/artifactId&gt;
            &lt;version&gt;2.0.2.RELEASE&lt;/version&gt;
            &lt;executions&gt;
                &lt;execution&gt;
                    &lt;id&gt;aggregate-metadata&lt;/id&gt;
                    &lt;phase&gt;compile&lt;/phase&gt;
                    &lt;goals&gt;
                        &lt;goal&gt;aggregate-metadata&lt;/goal&gt;
                    &lt;/goals&gt;
                &lt;/execution&gt;
            &lt;/executions&gt;
        &lt;/plugin&gt;
    &lt;/plugins&gt;
&lt;/build&gt;

SourceApplication.java

@SpringBootApplication
@EnableBinding(Source.class)
@EnableConfigurationProperties(ReportingProperties.class)
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }
}

ReportingProperties.java

@Validated
@ConfigurationProperties(&quot;reporting-properties&quot;)
public class ReportingProperties {

    /**
     * The starting date of the reporting.
     */
    private LocalDateTime fromDate = LocalDateTime.now().minusDays(1000);

    /**
     * The end date of the reporting.
     */
    private LocalDateTime toDate = LocalDateTime.now();

    public LocalDateTime getFromDate() {
        return fromDate;
    }

    public ReportingProperties setFromDate(LocalDateTime fromDate) {
        this.fromDate = fromDate;
        return this;
    }

    public LocalDateTime getToDate() {
        return toDate;
    }

    public ReportingProperties setToDate(LocalDateTime toDate) {
        this.toDate = toDate;
        return this;
    }
}

And finally the service :

@Configuration
@EnableBinding(Source.class)
public class PullUsersService {

    @Bean
    @Publisher(channel = Source.OUTPUT)
    @SendTo(Source.OUTPUT)
    public Supplier&lt;String&gt; pullUsers() {
        return () -&gt; &quot;Test&quot;;
    }

}

I'm wondering how to trigger the pulling mechanism so when deployed I can see "Test" in the logs
(I believe everything is setup correctly on SCDF, if I do "time | log" i can see some results in the log, but if I do "myservice | log" nothing appears.

What am I doing wrong ? (maybe there's some redundancy in my code)

答案1

得分: 1

以下是翻译好的部分:

注意:删除所有的 @EnableBinding(Source.class) 注解

@Component
public class PullUsersService {

    @PollableBean
    public Supplier<String> pullUsers() {
        return () -> "Test";
    }
}

根据您的绑定器配置,如果您想要将队列添加到 RabbitMQ 或 Kafka 中,请添加以下配置:

spring.cloud.function.definition=pullUsers
spring.cloud.stream.bindings.pullUsers-out-0.destination=users
spring.cloud.stream.bindings.pullUsers-out-0.group=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups=users-service

spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups 配置会强制生产者创建队列。

英文:

The answer above is correct, you can also do it the following way:

Note: Remove all @EnableBinding(Source.class) anotations

@Component
public class PullUsersService {

    @PollableBean
    public Supplier&lt;String&gt; pullUsers() {
        return () -&gt; &quot;Test&quot;;
    }
}

Add the following configurations if you want queue to rabbitMq or kafka based on your binder configuration

spring.cloud.function.definition=pullUsers
spring.cloud.stream.bindings.pullUsers-out-0.destination=users
spring.cloud.stream.bindings.pullUsers-out-0.group=users-service
spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups=users-service

spring.cloud.stream.bindings.pullUsers-out-0.producer.requiredGroups configuration forces producer to create the queues

答案2

得分: 0

这很有趣,是什么促使你使用了这个:

@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)

如果你查看你提到的那个time源代码,你会看到类似这样的内容:

@PollableSource
public String publishTime() {
    return new SimpleDateFormat(this.triggerProperties.getDateFormat()).format(new Date());
}

考虑使用@PollableSource 替代,并且不再使用Supplier
关键是你目前的所有注解都无法进行轮询。

@Publisher 只在我们调用该方法时起作用。
@SendTo 在这里完全被忽略,因为@Publisher 执行的正是相同的操作,它“发送至”。

英文:

That's interesting what made you to use this:

@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)

If you take a look into that time source code you mention, you'll see something like this:

@PollableSource
public String publishTime() {
	return new SimpleDateFormat(this.triggerProperties.getDateFormat()).format(new Date());
}

Consider to use that @PollableSource instead and don't use a Supplier.
The point is that all your current annotations do nothing with polling.

The @Publisher works only when we call the method.
The @SendTo is fully ignored here since @Publisher does exactly the same and it "sends to".

huangapple
  • 本文由 发表于 2020年10月15日 16:56:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/64368083.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定