分布式事务seata通过feign组件不能传递XID解决
不知道feign组件怎么创建可以看看这篇博客:https://www.yanbin.host/article/40
一般来说,xid不能通过feign在上下文里面进行传播,主要的原因是我们在feign组件里面开启了熔断器,熔断器开起来后采用的是线程隔离模式,所以导致xid丢失了。
需要把xid放到feign请求的头部里面,就可以了。
废话不多说
引入maven包
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5<ersion>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-core</artifactId>
<version>1.4.0<ersion>
</dependency>
重写HystrixConcurrencyStrategy
package com.fccn.feign.wechat.config;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: 木杉
* @e-mail: 819236727@qq.com
* @Date: 2021/1/25
* @Destrice:
*/
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy{
private final Logger log = LoggerFactory.getLogger(getClass());
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
String xid = RootContext.getXID();
requestAttributes.setAttribute(RootContext.KEY_XID,xid, RequestAttributes.SCOPE_REQUEST);
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
package com.fccn.feign.wechat.config;
import com.fccn.feign.wechat.PayWeChatFeign;
import com.fccn.feign.wechat.fallBack.HystrixFallBack;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import io.seata.core.context.RootContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.*;
/**
* @autor: 木杉
* create on 2020/8/11
* email: 819236727@qq.com
* describe:
*/
@Configuration
@EnableFeignClients(basePackageClasses = PayWeChatFeign.class)
public class FeignClientsConfiguration implements RequestInterceptor {
@Value("${fccn.config.fcAppToken}")
private String token;
@Bean
public HystrixFallBack weChatHystrixFallBack() {
return new HystrixFallBack();
}
@Bean
public RequestAttributeHystrixConcurrencyStrategy requestAttributeHystrixConcurrencyStrategy() {
return new RequestAttributeHystrixConcurrencyStrategy();
}
@Override
public void apply(RequestTemplate template) {
HttpServletRequest request = getHttpServletRequest();
if (Objects.isNull(request)) {
return;
}
Map<String, String> headers = getHeaders(request);
if (headers.size() > 0) {
Iterator<Map.Entry<String, String>> iterator = headers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
template.header(entry.getKey(), entry.getValue());
}
}
}
private HttpServletRequest getHttpServletRequest() {
try {
// 这种方式获取的HttpServletRequest是线程安全的
return ((ServletRequestAttributes) (RequestContextHolder.currentRequestAttributes())).getRequest();
} catch (Exception e) {
return null;
}
}
private Map<String, String> getHeaders(HttpServletRequest request) {
Map<String, String> map = new LinkedHashMap<>();
Enumeration<String> enums = request.getHeaderNames();
String kid = (String) request.getAttribute(RootContext.KEY_XID);
while (enums.hasMoreElements()) {
String key = enums.nextElement();
String value = request.getHeader(key);
map.put(key, value);
}
if (kid != null && !kid.equals("")) {
map.put(RootContext.KEY_XID, kid);
}
return map;
}
}
好了,再试试开启熔断之后,xid有没有通过feign在上下文进行传播
正文到此结束
热门推荐
该篇文章的评论功能已被站长关闭