如何以编程方式将文本写入 Flink 套接字?

huangapple go评论65阅读模式
英文:

How to programmatically write text to Flink socket?

问题

以下是翻译好的部分:

我的目标是将字符串“SUCCESS”发送到套接字,并使Apache Flink的DataStream捡起该字符串并使用单词“SUCCESS”更新本地文本文件。我已经设置了一个“Consumer DataStream”,用于监控从终端发送到本地主机上端口9999的文本。请参考下面的代码,了解工作中的Consumer DataStream代码:

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return value.equals("SUCCESS");
            }
        });

        filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);

        env.execute("Reading Flink Stream");
    }
}

对我来说,这很有效。当我在终端上运行“nc -l 9999”时,位于FILE-PATH的“Consumer Text File”只有在我在终端中键入“SUCCESS”并按下Enter键时才会更新。到此为止都很好!

现在,我想避免使用终端,但仍然想将文本写入此套接字,以便我的“Consumer DataStream”可以捡起它。这个文本可以是任何文本,因为我的“Consumer DataStream”将会过滤掉不需要的文本,只留下“SUCCESS”。

我知道的仅有的两种写入套接字的方法是使用终端(如前面讨论的)或使用一个Producer DataStream,它会持续从“Producer Text File”中读取,并调用DataStream.writeToSocket()将它读取的文本写入套接字。然后我的“Consumer DataStream”会捡起这个文本,按预期运行。

是否有其他选项可以将文本写入套接字,以便DataStream可以捡起这个文本?我尝试使用套接字库将“SUCCESS”写入到localhost:9999,但没有成功。

以下图片可能有助于直观地理解我的解决方案以及我已经解决的问题:https://ibb.co/41GzNWM

谢谢您的时间!

英文:

My goal is to send the string "SUCCESS" to a socket and have Apache Flink's DataStream pick up that string and update a local text file with the word "SUCCESS" I have already set up a "Consumer DataStream" which monitors text being sent from the terminal to port 9999 on my localhost. Please refer to code below for the working Consumer DataStream code:

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream&lt;String&gt; textStream = env.socketTextStream(&quot;localhost&quot;, 9999);

        SingleOutputStreamOperator&lt;String&gt; filteredStream = textStream.filter(new FilterFunction&lt;String&gt;() {
            public boolean filter(String value) throws Exception {
                return value.equals(&quot;SUCCESS&quot;);
            }
        });

        filteredStream.writeAsText(&lt;FILE-PATH&gt;, WriteMode.OVERWRITE);

        env.execute(&quot;Reading Flink Stream&quot;);
    }
}

This works for me; When I run "nc -l 9999" on my Terminal, the "Consumer Text File" found at FILE-PATH only gets updated if I type "SUCCESS" in the Terminal and press the Enter key. Great so far!

Now, I'd like to avoid using the Terminal but still write text to this socket so my "Consumer DataStream" can pick it up. This text can be any text, as my "Consumer DataStream" will filter out the text for what it wants aka "SUCCESS".

The only two ways I know how to write to a socket are using the terminal (as discussed earlier) or using a Producer DataStream which continually reads from a "Producer Text File" and calls DataStream.writeToSocket() to write the text it reads to the socket. My "Consumer Datastream" would then pick this up, and behave as intended.

Is there any other option that can be done to write a text to socket so that a DataStream can pick this text up? I tried using the socket library to write "SUCCESS" to localhost:9999 but to no avail.

This image might help with visualizing what I'm looking to solve, and what I've already solved: https://ibb.co/41GzNWM

Thank you for your time!

答案1

得分: 0

你需要创建一个等同于 nc -lsocket 服务器

一个示例代码如下:

public class SocketWriter {

    public static void main(String[] args) {
        int port = 9999;
        boolean stop = false;
        ServerSocket serverSocket = null;

        try {
            serverSocket = new ServerSocket(port);

            while (!stop) {
                Socket echoSocket = serverSocket.accept();
                PrintWriter out =
                        new PrintWriter(echoSocket.getOutputStream(), true);
                BufferedReader in =
                        new BufferedReader(
                                new InputStreamReader(echoSocket.getInputStream()));

                out.println("Hello Amazing");
                // 进行你想要的逻辑处理。

                in.close();
                out.close();
                echoSocket.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
英文:

You need to create an equivalent of nc -l aka socket server.

A sample code would be:

public class SocketWriter {

	public static void main(String[] args) {
		int port = 9999;
		boolean stop = false;
		ServerSocket serverSocket = null;


		try {

			serverSocket = new ServerSocket(port);

			while (!stop) {
				Socket echoSocket = serverSocket.accept();
				PrintWriter out =
						new PrintWriter(echoSocket.getOutputStream(), true);
				BufferedReader in =
						new BufferedReader(
								new InputStreamReader(echoSocket.getInputStream()));

				out.println(&quot;Hello Amazing&quot;);
				// do whatever logic you want.

				in.close();
				out.close();
				echoSocket.close();

			}
		}
		catch (IOException e) {
			e.printStackTrace();
		}
		finally {
			try {
				serverSocket.close();
			}
			catch (IOException e) {
				e.printStackTrace();
			}
		}

	}
}

huangapple
  • 本文由 发表于 2020年5月4日 07:55:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/61583168.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定