如何使调用线程周期性地等待ScheduledExecutorService中的任务完成工作。

huangapple go评论82阅读模式
英文:

How the caller thread wait till task under ScheduledExecutorService finish the job periodicaly

问题

我有一个需求,就是每隔45分钟进行一次 HTTP 调用并更新值。

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class TokenManagerRunnable implements Runnable{
    String token;

    public String fetchToken() {
        return this.token;
    }

    @Override
    public void run() {
        String result = "";

        HttpPost post = new HttpPost("https://login.microsoftonline.com/{{TenentId}}/oauth2/token");
        List<NameValuePair> urlParameters = new ArrayList<>();
        urlParameters.add(new BasicNameValuePair("grant_type", "client_credentials"));
        urlParameters.add(new BasicNameValuePair("client_id", "some client id"));
        urlParameters.add(new BasicNameValuePair("client_secret", "some secret id"));
        urlParameters.add(new BasicNameValuePair("resource", "https://database.windows.net"));

        try {
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        try (CloseableHttpClient httpClient = HttpClients.createDefault();
             CloseableHttpResponse response = httpClient.execute(post)){

            result = EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            e.printStackTrace();
        }

        ObjectNode node;
        try {
            node = new ObjectMapper().readValue(result, ObjectNode.class);

            System.out.println(result);

            if (node.has("access_token")) {
                result = node.get("access_token").toString();
            }
            System.out.println(result);
            System.out.println(result.substring(1, result.length()-1));

            // 更新 token
            this.token = result.substring(1, result.length()-1);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这是我的主要函数:

SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
System.out.println("获取令牌 ---- " + tokenManagerRunnable.fetchToken());
ds.setAccessToken(tokenManagerRunnable.fetchToken());

try {
    Connection connection = ds.getConnection(); 
    Statement stmt = connection.createStatement();
    ResultSet rs = stmt.executeQuery("select * from [dbo].[CLIENT]"); 
    System.out.println("成功登录");
    
    while(rs.next()) {
        System.out.println(rs.getString(1));
    }
    
} catch(Exception ex) {
    ex.printStackTrace();
}

tokenManagerRunnable.fetchToken() 返回 null,因为 TokenManagerRunnable 类尚未执行。如何实现等待 ScheduledExecutorService 完成任务,以便从 tokenManagerRunnable.fetchToken() 获取值,并在每隔45分钟后将新值设置在 Datasource 中,而不是返回 null

有什么想法吗?

英文:

I have a requirement like Every 45 minute value has to be updated after making http call.

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class TokenManagerRunnable implements Runnable{
String token;
public String fetchToken() {
return this.token;
}
@Override
public void run() {
String result = &quot;&quot;;
HttpPost post = new HttpPost(&quot;https://login.microsoftonline.com/{{TenentId}}/oauth2/token&quot;);
List&lt;NameValuePair&gt; urlParameters = new ArrayList&lt;&gt;();
urlParameters.add(new BasicNameValuePair(&quot;grant_type&quot;, &quot;client_credentials&quot;));
urlParameters.add(new BasicNameValuePair(&quot;client_id&quot;, &quot;some client id&quot;));
urlParameters.add(new BasicNameValuePair(&quot;client_secret&quot;, &quot;some secret id&quot;));
urlParameters.add(new BasicNameValuePair(&quot;resource&quot;, &quot;https://database.windows.net&quot;));
try {
post.setEntity(new UrlEncodedFormEntity(urlParameters));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try (CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post)){
result = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ObjectNode node;
try {
node = new ObjectMapper().readValue(result, ObjectNode.class);
System.out.println(result);
if (node.has(&quot;access_token&quot;)) {
result = node.get(&quot;access_token&quot;).toString();           
}
System.out.println(result);
System.out.println(result.substring(1, result.length()-1));
//updating the token
this.token = result.substring(1, result.length()-1);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

And here is my main function

        SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
System.out.println(&quot;fetching the token ---- &quot;+tokenManagerRunnable.fetchToken());
ds.setAccessToken(tokenManagerRunnable.fetchToken());
try {
Connection connection = ds.getConnection(); 
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(&quot; select * from [dbo].[CLIENT]&quot;); 
System.out.println(&quot;You have successfully logged in&quot;);
while(rs.next()) {
System.out.println(rs.getString(1));
}
}catch(Exception ex) {
ex.printStackTrace();
}

tokenManagerRunnable.fetchToken() brings null as the TokenManagerRunnable class is not yet executed.

How can we achieve wait till ScheduledExecutorService complete the task so we can get the value and set the new value in Datasource after every 45 minutes from tokenManagerRunnable.fetchToken() instead of null?

Any thoughts?

答案1

得分: 0

这,正如您已经了解的,是一个同步的问题。
您主要有两种方法来同步线程:

  • 使用join进行同步,
  • 使用回调进行异步。

根据重复任务的异步特性,我会建议您最好使用回调。
这样,您将能够在检索到令牌时设置新的令牌值。

我已更新您的代码如下:

// 导入 [...]

public class TokenManagerRunnable implements Runnable {
    private final SQLServerDataSource ds;

    /**
     * 采用数据源的新构造方法,以便在检索时触发令牌的持久化。
     * @param ds
     */
    public TokenManagerRunnable(SQLServerDataSource ds) {
        this.ds = ds;
    }

    /**
     * 在检索时将令牌持久化到数据源的新方法。
     * @param token
     */
    private void setToken(String token) {
        System.out.println("fetching the token ---- " + token);
        this.ds.setAccessToken(token);
    }

    @Override
    public void run() {
        // 检索 [...]
        try {
            // 解析 [...]
            // 更新令牌
            this.setToken(result.substring(1, result.length() - 1));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

如您所见,Runner不需要任何状态,因为它将直接将结果流式传输到数据源。
您只需要在构造时将此数据源提供给Runner。

SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);

// 同步运行1次,在此运行调用结束时,令牌将被设置
tokenManagerRunnable.run();
        
// 从现在开始,每隔45分钟调度一次令牌刷新
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);

// 使用令牌 [...]

编辑

正如您在评论中所述,您需要在能够联系数据库之前执行Runnable
您可以选择同步执行此操作,或将下面的代码添加到单独的线程中,具体取决于您的应用程序意图。在此之下的问题是 您的应用程序是否依赖于此初始化?

事实上,您可以在不将其放入线程中的情况下调用run()方法,这将仅同步运行令牌更新。
这意味着在启动计划的线程执行自动刷新之前,您需要同步地调用tokenManagerRunnable.run();

英文:

This is, as you already know, a matter of synchronization.
You have mainly two ways of synchronizing threads:

  • synchronous using join,
  • asynchronous using callbacks.

From the asynchronous nature of the repetitive task, I would say that your best bet is using callbacks.
This way you will be able to set the new token value whenever you retrieve it.

I updated your code below:

// Imports [...]

public class TokenManagerRunnable implements Runnable {
    private final SQLServerDataSource ds;

    /**
     * New constructor taking a datasource to trigger the persistence of the token
     * on retrieval.
     * @param ds
     */
    public TokenManagerRunnable(SQLServerDataSource ds) {
        this.ds = ds;
    }

    /**
     * New method persisting the token on datasource when retrieved.
     * @param token
     */
    private void setToken(String token) {
        System.out.println(&quot;fetching the token ---- &quot; + token);
        this.ds.setAccessToken(token);
    }

    @Override
    public void run() {
        // Retrieval [...]
        try {
            // Parsing [...]
            //updating the token
            this.setToken(result.substring(1, result.length() - 1));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

As you can see, you won't need any state on the Runner, as it will directly stream the result to the datasource.
You only have to provide this datasource to the runner on construction.

SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);

// Run 1 time synchronously, at the end of this run call
// the token will be set
tokenManagerRunnable.run();
        
// Schedule a token refresh every 45 minutes starting now
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);

// Use the token [...]

Edit

As you stated in your comment, you need to execute the Runnable before being able to contact your database.
You need either to do this synchronously or add the code below in a separate thread, depending on what you intend to do with your application . The question underneath being is your application dependent on this initialization?.

As a matter of fact, you can call your run() method without putting it in a thread, and that will simply run synchronously your token update.
This means that you need to call tokenManagerRunnable.run(); synchronously before starting the automatic refresh in a scheduled thread execution.

答案2

得分: 0

如果我理解你的问题正确,你可以使用CompletableFuture。你将token包装在一个CompletableFuture中,然后在调度线程中完成它。由于CompletableFuture是一个Future,其他线程可以在上面等待结果。

以下是演示机制的基本实现。

import java.util.concurrent.CompletableFuture;

class Main {
    
    static CompletableFuture<String> token = new CompletableFuture<>();
    
    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100_000_000; i++) {
                Math.log(Math.sqrt(i));
                if (i % 10_000_000 == 0) {
                    System.out.println("doing stuff");
                }
            }
            token.complete("result");
        }).start();
        
        String result = token.join(); // 等待直到token被设置并获取结果
        System.out.println("获取到结果:" + result);
    }
}

请记住,在获得结果后,你需要为token分配一个新的CompletableFuture。这是因为它们只能被完成一次。

英文:

If I get your question right, you can just use a CompletableFuture. You wrapp token in a CompletableFuture and the schedule-thread completes it. As CompletableFuture is a Future, other threads can just wait on it for the result.

Here is a basic implementation that illustrates the mechanism.

import java.util.concurrent.CompletableFuture;

class Main {
	
	static CompletableFuture&lt;String&gt; token = new CompletableFuture&lt;&gt;();
	
	
	public static void main(String[] args) {
	    new Thread(() -&gt; {
	    	for (int i = 0; i &lt; 100_000_000; i++) {
	    		Math.log(Math.sqrt(i));
	    		if (i % 10_000_000 == 0) {
	    			System.out.println(&quot;doing stuff&quot;);
	    		}
	    	}
	    	token.complete(&quot;result&quot;);
	    }).start();
	    
	    String result = token.join(); // wait until token is set and get it
	    System.out.println(&quot;got &quot; + result);
	}
}

Keep in mind that you'll have to assign token a new CompletableFuture after you get the result. That's because they can be completed only once.

答案3

得分: 0

以下是翻译好的代码部分:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.util.ArrayList;
import java.util.List;

public class AzureServicePrincipleTokenManager implements Runnable {

    SQLServerDataSource ds ;
    String secret;
    String clientId;
    String tenetId;
    static final String RESOURCE = "https://database.windows.net";
    static final String ENDPOINT = "https://login.microsoftonline.com/";
    static final String GRANT_TYPE = "client_credentials";
    boolean confirmTokenFlag=false;
    private static Log logger = LogFactory.getLog("AzureServicePrincipleTokenManager");

    public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
        this.ds = ds;
        this.secret = secret;
        this.clientId = clientId;
        this.tenetId = tenetId;
    }

    public boolean getConfirmTokenFlag(){
        return this.confirmTokenFlag;
    }

    private void setAccessToken(String token){
        this.ds.setAccessToken(token);
    }

    @Override
    public void run() {
        logger.info("Fetching Service Principle accessToken");
        String result = "";
        HttpPost post = new HttpPost(ENDPOINT+this.tenetId+"/oauth2/token");
        List<NameValuePair> urlParameters = new ArrayList<>();
        urlParameters.add(new BasicNameValuePair("grant_type", GRANT_TYPE));
        urlParameters.add(new BasicNameValuePair("client_id", this.clientId));
        urlParameters.add(new BasicNameValuePair("client_secret", this.secret));
        urlParameters.add(new BasicNameValuePair("resource", RESOURCE));

        try{
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
            CloseableHttpClient httpClient = HttpClients.createDefault();
            CloseableHttpResponse response = httpClient.execute(post);
            result = EntityUtils.toString(response.getEntity());
            ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);

            if (node.has("access_token")) {
                result = node.get("access_token").toString();
            }

        }catch (Exception ex){
            logger.error(ex.getMessage(),ex);
        }

        this.setAccessToken(result.substring(1, result.length()-1));
        confirmTokenFlag=true;
        logger.info("set confirmTokenFlag true");
    }
}

调用者部分:

SQLServerDataSource ds = new SQLServerDataSource();

AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds, "your tenetID", "your clientID", "your secret");
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
logger.info("----ExecuterService started the Runnable task");

while (azureServicePrincipleTokenManager.getConfirmTokenFlag() != true) {
    ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i'm calling ds.getAccessToken();
}
logger.info("----get the token after setting up " + ds.getAccessToken());
英文:

I was able to achieve using following code

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import java.util.ArrayList;
import java.util.List;
public class AzureServicePrincipleTokenManager implements Runnable {
SQLServerDataSource ds ;
String secret;
String clientId;
String tenetId;
static final String RESOURCE = &quot;https://database.windows.net&quot;;
static final String ENDPOINT = &quot;https://login.microsoftonline.com/&quot;;
static final String GRANT_TYPE = &quot;client_credentials&quot;;
boolean confirmTokenFlag=false;
private static Log logger = LogFactory.getLog( &quot;AzureServicePrincipleTokenManager&quot; );
public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
this.ds = ds;
this.secret = secret;
this.clientId = clientId;
this.tenetId = tenetId;
}
public boolean getConfirmTokenFlag(){
return this.confirmTokenFlag;
}
private void setAccessToken(String token){
this.ds.setAccessToken(token);
}
@Override
public void run() {
logger.info(&quot;Fetching Service Principle accessToken&quot;);
String result = &quot;&quot;;
HttpPost post = new HttpPost(ENDPOINT+this.tenetId+&quot;/oauth2/token&quot;);
List&lt;NameValuePair&gt; urlParameters = new ArrayList&lt;&gt;();
urlParameters.add(new BasicNameValuePair(&quot;grant_type&quot;, GRANT_TYPE));
urlParameters.add(new BasicNameValuePair(&quot;client_id&quot;, this.clientId));
urlParameters.add(new BasicNameValuePair(&quot;client_secret&quot;, this.secret));
urlParameters.add(new BasicNameValuePair(&quot;resource&quot;, RESOURCE));
try{
post.setEntity(new UrlEncodedFormEntity(urlParameters));
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post);
result = EntityUtils.toString(response.getEntity());
ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);
if (node.has(&quot;access_token&quot;)) {
result = node.get(&quot;access_token&quot;).toString();
}
}catch (Exception ex){
logger.error(ex.getMessage(),ex);
}
this.setAccessToken(result.substring(1, result.length()-1));
confirmTokenFlag=true;
logger.info(&quot;set confirmTokenFlag true&quot;);
}
} 

And the caller would be like this

        SQLServerDataSource ds = new SQLServerDataSource();
AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds,&quot;your tenentID&quot;,&quot;your clientID&quot;,&quot;your secret&quot;);
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
logger.info(&quot;----ExecuterService started the Runnable task&quot;);
while (azureServicePrincipleTokenManager.getConfirmTokenFlag()!=true){
ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i&#39;m calling ds.getAccessToken();
}
logger.info(&quot;----get the token after settingup  &quot;+ds.getAccessToken());

huangapple
  • 本文由 发表于 2020年7月24日 01:03:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/63059534.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定