jdk: 1.8
网上帖子千千万,咱也一起炒冷饭.
CompletableFuture是用于构建异步编程的基础,同时它继承自Future和CompletionStage, CompletionStage这个接口是一个promise,它表示这个计算最终会完成。
1.不能手动设置操作完成
e.g. 开单独的线程通过rpc调一个接口返回一些数据,假如下游接口挂了,那么想通过返回上次缓存住的数据来进行手动完成,这时候Future没有这样的api支持
2.只能阻塞式的操作Future的返回结果
e.g 意思就是当结果可用的时候,Future不会主动通知你,你只能通过调用get()方法阻塞的等待返回结果。你没办法给Future设置一个回调,当结构可用的时候自动调用这个回调。
3.多个Future没办法进行链式调用
e.g 比如有2个耗时的计算(A,B),并且他们有依赖关系,当A计算完成,需要传递给B进行继续计算。Future不支持异步的流式计算调用。
4.不支持组合多个Future
e.g 比如有5个并发支持的Future,我们想在他们都执行完成后在做一些额外操作,Future不支持。
@Test
public void test1() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>(); //step1
new Thread(new CompleteFutureTask(completableFuture)).start(); //step2
String result = completableFuture.get(); //step3
System.out.println(Thread.currentThread().getName() + " get result:" + result);
}
class CompleteFutureTask implements Runnable {
CompletableFuture<String> completableFuture;
public CompleteFutureTask(CompletableFuture<String> completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public void run() {
MixAll.simulateComputeCost();
System.out.println(Thread.currentThread().getName() + " will complete future:" + completableFuture);
completableFuture.complete("hello completableFuture"); //step4
//completableFuture.complete("hello completableFuture"); //step5
}
}
step1 最简单的方式创建一个CompletableFuture, 如果没有step2, 那么位于step3的get()方法会一直阻塞, 因为计算一直没有完成.
同时step4 的多次调用是无效的, 即step5是无效的
示例代码
@Test
public void runAsyncTest() throws ExecutionException, InterruptedException {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
MixAll.simulateComputeCost();
System.out.println(Thread.currentThread().getName() + " runAsyncTest...................");
});
cf.get(); //step1
//MixAll.simulateComputeCost(8); //step2
}
由于这个是在junit里面进行的,所以需要等待它执行完成,等待的方式有2种: step1和step2. 不要误认为必须调用get()方法 该方法有2个重载方法
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
方法1使用的是ForkJoinPool的线程池, 方法2可以传入自定义的线程池
示例代码
@Test
public void supplyAsyncTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
MixAll.simulateComputeCost();
MixAll.printWithThread(" will return something");
return "hello supplyAsync";
});
String data = cf.get();
MixAll.printWithThread(" get data:" + data);
}
依然有2个重载方法,详见api, 线程池说明同上
上面的get()是阻塞的, 他会一直阻塞直到执行完成,这就是最开始我们说的Future的不足,这不是我们想要的.
我们需要的是可以设置一个回调,等执行完成后自动进行调用, 下面我们看下这几个方法.
1.thenApply()方法,它接受一个Function作为入参,同时返回带参的CompletableFuture
示例代码
@Test
public void thenApplyTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
MixAll.simulateComputeCost();
MixAll.printWithThread(" will return something");
return "hello supplyAsync";
})
.thenApply(info -> {
MixAll.simulateComputeCost();
MixAll.printWithThread(" will return something");
return "thenApply|" + info;
});
String data = cf.get();
MixAll.printWithThread(" get>>" + data);
}
supplyAsync执行完成后,自动调用thenApply()方法
执行thenApply里面的逻辑的线程和执行supplyAsync逻辑的是同一个.
2.thenAccept(),返回CompletableFuture
@Test
public void thenAcceptTest() {
CompletableFuture.supplyAsync(() -> ProductService.Instance.getProduct(12))
.thenAccept(product -> MixAll.printWithThread(" thenAccept get productName:" + product.getName()));
//等待执行完成
MixAll.simulateComputeCost(8);
}
3.thenRun(), 返回CompletableFuture
@Test
public void thenRunTest() {
long start = System.currentTimeMillis();
CompletableFuture
.runAsync(() -> ProductService.Instance.getProduct(12))
.thenRun(() -> MixAll.printWithThread(" thenRun cost:" + (System.currentTimeMillis() - start) + "ms"));
//等待执行完成
MixAll.simulateComputeCost(8);
}
这3个方法都有另外2个后缀是Async的重载方法,以thenApply为例
thenApplyAsync(Function<? super T,? extends U> fn) //method1
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) //method2
method1在方法内部使用了ForkJoinPool.commonPool(),在测试过程中发现执行thenApplyAsync的逻辑和执行supplyAsync的线程依然是同一个,猜测是又在线程池里拿到了同一个,但是好巧啊…
method2使用了传递的executor进行执行
canal: 1.1.3
rocketmq: 4.3.2
No route info for this topic, XXX topic
根据错误日志提示,查到源码。定位到在下面的方法
1)DefaultMQProducerImpl.sendSelectImpl 这个方法
2)一路debug到MQClientInstance.updateTopicRouteInfoFromNameServer
3)想要找到使用nameserver的地址的地方,发现地址都是直接取的,没有分隔的地方,应该方向错误了
转换思路,可能是由于在启动的时候set nameserver地址的时候出的问题, 下面进行验证
1)DefaultMQProducerImpl看它的start方法
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
继续往下
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
继续往下debug
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
发现根源
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> list = Arrays.asList(addrArray);
this.remotingClient.updateNameServerAddressList(list);
}
nameserver的地址是用;分隔的,但是canal里面配置的nameserver是通过,分隔的
导致实际上addrArray[0]是这样一个地址”ip1:port,ip2:port”,这是一个无效的地址。
导致后续使用的时候无法连接到nameserver获取route信息报错
springboot: 2.0.3
java: 1.8
1.加载properties或者yml配置文件的类ConfigFileApplicationListener
2.org.springframework.core.Ordered 接口的顺序是值越小, 顺序越提前
1.基于rocketmq做了一个刷新缓存的工具,为了保证分组相同,第一版本取的hostname作为consumer group
2.部署后发现机器的hostname带 “.”,这是运维的规范
3.改进, 继承ConfigFileApplicationListener, 顺序比它大, 逻辑是取spring.cloud.client.hostname,然后把”.”替换成”-“,然后使用一个新key放入env,然后consumer group使用新key
4.详见cache-refresh项目
java: 1.8
@Test
public void cycleNode() {
Node head = new Node(1);
Node lhead = head;
int len = 6;
for (int i = 2; i <= len; i++) {
Node n = new Node(i);
lhead.setNext(n);
lhead = n;
}
int mid = len / 2;
int mod = len % 2;
int step = mod == 0 ? mid : (mid + 1);
Node rpre = null;
Node rhead = null;
lhead = head;
while (step > 0) {
step--;
lhead = lhead.getNext();
if (step == 1) {//最后一步
rpre = lhead;
rhead = rpre.getNext();
}
}
lhead = head;
//当是偶数数量时, lhead 和 rpre重合时,则剩余最后的Ln,Rn
while (rhead != null && lhead != rpre) {
Node lnext = lhead.getNext();
lhead.setNext(rhead);
rpre.setNext(rhead.getNext());
rhead.setNext(lnext);
rhead = rpre.getNext();
if (rhead != null) {
lhead = lhead.getNext().getNext();
}
}
lhead = head;
while (lhead != null) {
System.out.print("node:" + lhead.getVal() + " 's next==>");
lhead = lhead.getNext();
}
}
grafna: 6.1.0
redis_exporter: 0.30.0
prometheus: 2.8.1
1.下载redis_exporter,这是一个可执行文件,查看帮忙文档 ./redis_exporter -h
2.监听redis cluster, 命令 ./redis_exporter -redis.addr ip1:7001,ip2:7002,ip3:7003 -redis.password xxx
3.在prometheus服务的prometheus.yml里面添加如下信息,然后重启
- job_name: redis
static_configs:
- targets: ['redis_exporter_ip:9121']
labels:
instance: redis-cluster
4.grafna 的prometheus redis json格式模板,https://grafana.com/dashboards/763
5.导入改模板,可以直接使用763模板号,或者把json贴到grafna里面