Commit f0fbec4f by “zcwang”

Merge remote-tracking branch 'origin/feature-changeFlow-221110' into feature-changeFlow-221110

parents 98ce0bfa 29d25962
Pipeline #43499 passed with stages
in 1 minute 29 seconds
......@@ -3,26 +3,18 @@ package com.netease.mail.yanxuan.change.integration.flow.supplier;
import com.netease.mail.yanxuan.change.integration.flow.supplier.req.SupplierRelatedUserReq;
import com.netease.mail.yanxuan.change.integration.flow.supplier.rsp.SupplierUserRsp;
import com.netease.yanxuan.flowx.sdk.meta.controller.communal.AjaxResponse;
import com.netease.yanxuan.missa.client.annotation.MissaClient;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
/**
* @author WangJiaXiang
* @date 2022/12/8/008$
*/
@Service
@MissaClient(serviceCode = "yanxuan-supplier-ms1")
public interface FlowRpcSupplierInfoService {
/**
* 查询供应商关联角色
*/
@PostMapping("/api/supplier/batchGetSupplierRelatedUser.json")
AjaxResponse<List<SupplierUserRsp>> querySupplierRelatedUser(@RequestBody SupplierRelatedUserReq req);
AjaxResponse<List<SupplierUserRsp>> querySupplierRelatedUser(SupplierRelatedUserReq req);
......
package com.netease.mail.yanxuan.change.integration.flow.supplier.impl;
import com.netease.mail.yanxuan.change.integration.flow.supplier.FlowRpcSupplierInfoService;
import com.netease.mail.yanxuan.change.integration.flow.supplier.req.SupplierRelatedUserReq;
import com.netease.mail.yanxuan.change.integration.flow.supplier.rsp.SupplierUserRsp;
import com.netease.mail.yanxuan.change.integration.rpc.RpcObjectHandler;
import com.netease.mail.yanxuan.change.integration.rpc.RpcTemplate;
import com.netease.yanxuan.flowx.sdk.meta.controller.communal.AjaxResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
/**
* @author WangJiaXiang
* @date 2022/12/8/008$
*/
@Service
public class FlowRpcSupplierInfoServiceImpl implements FlowRpcSupplierInfoService {
@Autowired
private RpcTemplate rpcTemplate;
Logger logger = LoggerFactory.getLogger(FlowRpcSupplierInfoServiceImpl.class);
@Override
public AjaxResponse<List<SupplierUserRsp>> querySupplierRelatedUser(SupplierRelatedUserReq req) {
String url = "http://127.0.0.1:8550/proxy/test-v1.yanxuan-supplier-ms.service.mailsaas/api/supplier/batchGetSupplierRelatedUser.json";
HashMap<String, Object> params = new HashMap<>();
params.put("searchType",req.getSearchType());
params.put("supplierIdList",req.getSupplierIdList());
Object supplierUserRsp = rpcTemplate.post(url, params,500,new RpcObjectHandler<>(Object.class));
logger.error("请求返回结果"+supplierUserRsp.toString());
return null;
}
}
package com.netease.mail.yanxuan.change.integration.rpc;
@FunctionalInterface
public interface Checker {
/**
* 校验结果是否成功
* @return
*/
boolean check();
}
\ No newline at end of file
package com.netease.mail.yanxuan.change.integration.rpc;
import org.apache.commons.lang3.StringUtils;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author lwtang
* @date 2019-02-19
*/
public class EncodeUtil {
private static final char[] CHARS = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8',
'9', 'a', 'b', 'c', 'd', 'e', 'f' };
public static String encode(String text) {
try {
return URLEncoder.encode(text, "UTF-8");
} catch (Exception e) {
return text;
}
}
public static String encodeMap(Map<?, ?> map) {
List<String> lines = new ArrayList<>();
for (Map.Entry<?, ?> entry: map.entrySet()) {
lines.add(String.valueOf(entry.getKey()) + "=" + String.valueOf(entry.getValue()));
}
return StringUtils.join(lines, "&");
}
public static String decode(String text) {
try {
return URLDecoder.decode(text, "UTF-8");
} catch (Exception e) {
return text;
}
}
public static String ASCIIHex(String text) {
byte[] bytes = text.getBytes();
char[] chars = new char[bytes.length * 2];
for (int i = 0; i < bytes.length; i++) {
chars[2 * i] = CHARS[bytes[i] >> 4];
chars[2 * i + 1] = CHARS[bytes[i] % 16];
}
return new String(chars).toUpperCase();
}
}
package com.netease.mail.yanxuan.change.integration.rpc;
import com.netease.mail.yanxuan.change.integration.email.exception.RpcException;
import org.apache.http.Consts;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.config.*;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.CodingErrorAction;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* HttpClient 封装
*
* @author lwtang
* @date 2019-02-19
*/
public class HttpClientUtil {
private static final Logger LOG = LoggerFactory.getLogger(HttpClientUtil.class);
private static final int DEFAULT_TIME_OUT = 5000;
private static final String DEFAULT_ENCODING = "UTF-8";
private static PoolingHttpClientConnectionManager connManager = null;
private static CloseableHttpClient httpclient = null;
static {
try {
SSLContext sslContext = SSLContexts.custom().build();
sslContext.init(null, new TrustManager[] { new DefaultTrustManager() }, null);
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", new SSLConnectionSocketFactory(sslContext)).build();
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
httpclient = HttpClients.custom().setConnectionManager(connManager).build();
// Create socket configuration
SocketConfig socketConfig = SocketConfig.custom().setSoKeepAlive(true)
.setSoReuseAddress(true).setSoTimeout(DEFAULT_TIME_OUT).setTcpNoDelay(true).build();
connManager.setDefaultSocketConfig(socketConfig);
// Create message constraints
MessageConstraints messageConstraints = MessageConstraints.custom()
.setMaxHeaderCount(200).setMaxLineLength(2000).build();
// Create connection configuration
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE).setCharset(Consts.UTF_8)
.setMessageConstraints(messageConstraints).build();
connManager.setDefaultConnectionConfig(connectionConfig);
connManager.setMaxTotal(300);
connManager.setDefaultMaxPerRoute(100);
} catch (KeyManagementException e) {
LOG.error("KeyManagementException", e);
} catch (NoSuchAlgorithmException e) {
LOG.error("NoSuchAlgorithmException", e);
}
}
public static String get(String url, Map<String, Object> params, Map<String, String> header,
int timeout) {
return doGet(url, params, header, DEFAULT_ENCODING, timeout);
}
public static String doGet(String url, Map<String, Object> params, Map<String, String> header,
String encoding, int timeout) {
String responseString = null;
HttpGet get = new HttpGet();
fillUrl(url, params, encoding, get);
fillHeader(header, get);
fillConfig(timeout, timeout, get);
try {
CloseableHttpResponse response = httpclient.execute(get);
try {
int resHttpCode = response.getStatusLine().getStatusCode();
if( resHttpCode != 200) {
throw new RpcStatusException(resHttpCode, "status code " + resHttpCode + " not equal 200");
}
HttpEntity entity = response.getEntity();
try {
if (entity != null) {
responseString = EntityUtils.toString(entity, encoding);
}
} finally {
if (entity != null) {
entity.getContent().close();
}
}
return responseString;
} catch (Exception e) {
LOG.error("[HttpUtils Get response error]", e);
throw e;
} finally {
if (response != null) {
response.close();
}
}
} catch (ConnectException e) {
throw new RpcConnectException("request connect failed!");
} catch (SocketTimeoutException e) {
throw new RpcTimeoutException("request timeout!");
} catch (IOException e) {
LOG.error("[query webservice error]", e);
} finally {
get.releaseConnection();
}
return responseString;
}
public static String postJson(String url, String params, int timeout) {
Map<String, String> header = new HashMap<>();
header.put("Content-type", ContentType.APPLICATION_JSON.getMimeType());
return doPost(url, new StringEntity(params, DEFAULT_ENCODING), header, DEFAULT_ENCODING,
timeout);
}
public static String post(String url, Map<String, Object> params, int timeout) {
Map<String, String> header = new HashMap<>();
header.put("Content-type", ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
HttpEntity httpEntity = null;
if (params != null && params.size() > 0) {
List<BasicNameValuePair> formParams = params.entrySet().stream()
.map(entry -> new BasicNameValuePair(entry.getKey(),
entry.getValue() == null ? "" : entry.getValue().toString()))
.collect(Collectors.toList());
try {
httpEntity = new UrlEncodedFormEntity(formParams, DEFAULT_ENCODING);
} catch (UnsupportedEncodingException e) {
throw new RpcException(e);
}
}
return doPost(url, httpEntity, header, DEFAULT_ENCODING, timeout);
}
public static String doPost(String url, HttpEntity params, Map<String, String> header,
String encoding, int timeout) {
String responseContent = null;
HttpPost post = new HttpPost(url);
fillHeader(header, post);
fillConfig(timeout, timeout, post);
post.setEntity(params);
try {
CloseableHttpResponse response = httpclient.execute(post);
try {
HttpEntity entity = response.getEntity();
responseContent = EntityUtils.toString(entity, encoding);
} finally {
if (response != null) {
response.close();
}
}
} catch (ConnectException e) {
throw new RpcConnectException("request connect failed!");
} catch (SocketTimeoutException e) {
throw new RpcTimeoutException("request timeout!");
} catch (IOException e) {
LOG.error("[query webservice error]", e);
} finally {
post.releaseConnection();
}
return responseContent;
}
/**
* concat url
*/
private static String fillUrl(String url, Map<String, Object> params, String encoding,
HttpRequestBase requestBase) {
StringBuilder sb = new StringBuilder();
sb.append(url);
int i = 0;
if (params != null && params.size() > 0) {
for (Map.Entry<String, Object> entry: params.entrySet()) {
if (i == 0 && !url.contains("?")) {
sb.append("?");
} else {
sb.append("&");
}
sb.append(entry.getKey());
sb.append("=");
Object v = entry.getValue();
String value = v == null ? "" : v.toString();
encoding = encoding == null ? Consts.UTF_8.name() : encoding;
try {
sb.append(URLEncoder.encode(value, encoding));
} catch (UnsupportedEncodingException e) {
LOG.warn("encoding common get params error, value={}", value, e);
try {
sb.append(URLEncoder.encode(value, encoding));
} catch (Exception ex) {
e.printStackTrace();
}
}
i++;
}
}
requestBase.setURI(URI.create(sb.toString()));
return sb.toString();
}
private static void fillHeader(Map<String, String> header, HttpRequestBase requestBase) {
if (header == null || header.size() == 0) {
return;
}
Header[] headers = header.entrySet().stream()
.map(entry -> new BasicHeader(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()).toArray(new Header[header.size()]);
if (headers.length > 0) {
requestBase.setHeaders(headers);
}
}
/**
* fill common conf
*/
private static void fillConfig(int connectTimeout, int soTimeout, HttpRequestBase requestBase) {
if (connectTimeout <= 0) {
connectTimeout = DEFAULT_TIME_OUT;
}
if (soTimeout <= 0) {
soTimeout = DEFAULT_TIME_OUT;
}
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(soTimeout)
.setConnectTimeout(connectTimeout).setConnectionRequestTimeout(connectTimeout).build();
requestBase.setConfig(requestConfig);
}
public static class DefaultTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}
package com.netease.mail.yanxuan.change.integration.rpc;
import com.netease.mail.yanxuan.change.integration.email.exception.RpcException;
public class RpcConnectException extends RpcException {
public RpcConnectException(String message) {
super(message);
}
}
package com.netease.mail.yanxuan.change.integration.rpc;
public class RpcException extends RuntimeException {
public RpcException(String message) {
super(message);
}
public RpcException(Throwable t) {
super(t);
}
public RpcException(String message, Throwable t) {
super(message, t);
}
}
\ No newline at end of file
package com.netease.mail.yanxuan.change.integration.rpc;
import com.alibaba.fastjson.JSON;
import java.io.IOException;
public class RpcObjectHandler<T> implements RpcTemplate.CallHandler<T> {
private Class<T> tClass;
public RpcObjectHandler(Class<T> tClass) {this.tClass = tClass;}
@Override
public T handle(String resp) throws IOException {
return JSON.parseObject(resp, tClass);
}
}
\ No newline at end of file
package com.netease.mail.yanxuan.change.integration.rpc;
import com.netease.mail.yanxuan.change.integration.email.exception.RpcException;
public class RpcStatusException extends RpcException {
private int code;
public RpcStatusException(int code, String message) {
super(message);
this.code = code;
}
public int getCode() {
return this.code;
}
}
\ No newline at end of file
package com.netease.mail.yanxuan.change.integration.rpc;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* rpc 请求封装,在 HttpClientUtil 基础上加入了返回值处理和日志
*
* @author lwtang
* @date 2019-02-19
*/
@Component
public class RpcTemplate {
@Value("${act.rpc.get.retry:1}")
private int rpcGetRetry;
@Value("${act.rpc.post.retry:0}")
private int rpcPostRetry;
Logger logger = LoggerFactory.getLogger(RpcTemplate.class);
public RpcTemplate() {
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
}
/**
* 校验resp
*
* @param t
* @param <T>
* @return
*/
private <T> RpcStatus checkResp(CallHandler<T> handler, T t) {
RpcStatus status = RpcStatus.SUCCESS;
try {
// 为空,或handlerCheck为false直接判断FAIL
if (t == null || !handler.check(t)) {
return RpcStatus.WARNING;
}
if (t instanceof Checker) {
status = ((Checker) t).check() ? RpcStatus.SUCCESS : RpcStatus.WARNING;
} else if (t instanceof List) {
// 为空认为不是理想结果
if (CollectionUtils.isEmpty((List) t)) {
status = RpcStatus.WARNING;
} else {
// 列表数据简单检测第一个元素
Object obj = ((List) t).get(0);
if (obj instanceof Checker) {
status = ((Checker) obj).check() ? RpcStatus.SUCCESS : RpcStatus.WARNING;
}
}
}
} catch (Exception e) {
logger.error("collect error, msg={}", e.getMessage());
}
return status;
}
private String convertParam(Object param) {
if (param instanceof String) {
return (String) param;
}
String reqParam = "";
if (param instanceof Map) {
reqParam = EncodeUtil.encodeMap((Map) param);
} else if (param != null) {
reqParam = JSON.toJSONString(param);
}
return reqParam;
}
private <T> T execute(String url, Object params, CallHandler<T> handler,
CallTemplate callTemplate, int retryTime, Method method) {
int tryTimes = 0;
RpcException rpcException = null;
String respBody = null;
int statusCode = 200;
while (tryTimes <= retryTime) {
long startTime = System.currentTimeMillis();
try {
respBody = callTemplate.executeCall();
if (StringUtils.isEmpty(respBody)) {
throw new RpcException("response empty");
}
T t = handler.handle(respBody);
return t;
} catch (RpcTimeoutException e) {
} catch (RpcStatusException e) {
statusCode = e.getCode();
} catch (RpcException e) {
rpcException = e;
} catch (Exception e) {
rpcException = new RpcException(e);
} finally {
if (logger.isDebugEnabled()) {
long costTime = System.currentTimeMillis() - startTime;
String reqParam = convertParam(params);
logger.debug(
"process request, url={}, method={}, params={}, statusCode = {}, response={}, cost={}ms",
url, method.name(), reqParam, statusCode, respBody, costTime);
}
}
tryTimes++;
}
logger.error("request error, url={}, params={}, statusCode = {}, respBody={}", url,
convertParam(params), statusCode, respBody, rpcException);
if (rpcException == null) {
rpcException = new RpcException("服务异常");
}
throw rpcException;
}
/**
* Get 请求
*
* @param url
* @param params
* @param header
* @param timeout
* -1 取默认的超时时间
* @param handler
* @param <T>
* @return
*/
public <T> T get(String url, Map<String, Object> params, Map<String, String> header,
int timeout, CallHandler<T> handler, Integer retryTimes) {
CallTemplate template = () -> HttpClientUtil.get(url, params, header, timeout);
return this.execute(url, params, handler, template, retryTimes, Method.GET);
}
public <T> T get(String url, Map<String, Object> params, CallHandler<T> handler) {
return get(url, params, null, -1, handler, rpcGetRetry);
}
/**
* Post请求(url encoded)
*
* @param url
* @param params
* @param timeout
* @param handler
* @param <T>
* @return
*/
public <T> T post(String url, Map<String, Object> params, int timeout, CallHandler<T> handler,
Integer retryTime) {
CallTemplate template = () -> HttpClientUtil.post(url, params, timeout);
return this.execute(url, params, handler, template, retryTime, Method.POST);
}
/**
* Post请求(url encoded)
*
* @param url
* @param params
* @param timeout
* @param handler
* @param <T>
* @return
*/
public <T> T post(String url, Map<String, Object> params, int timeout, CallHandler<T> handler) {
CallTemplate template = () -> HttpClientUtil.post(url, params, timeout);
return this.execute(url, params, handler, template, rpcPostRetry, Method.POST);
}
/**
* Post Json请求
*
* @param url
* @param params
* @param timeout
* @param handler
* @param <T>
* @return
*/
public <T> T postJson(String url, String params, int timeout, CallHandler<T> handler,
Integer retryTime) {
CallTemplate template = () -> HttpClientUtil.postJson(url, params, timeout);
return this.execute(url, params, handler, template, retryTime, Method.POST);
}
/**
* Post Json请求
*
* @param url
* @param params
* @param timeout
* @param handler
* @param <T>
* @return
*/
public <T> T postJson(String url, String params, int timeout, CallHandler<T> handler) {
CallTemplate template = () -> HttpClientUtil.postJson(url, params, timeout);
return this.execute(url, params, handler, template, rpcPostRetry, Method.POST);
}
enum RpcStatus {
/**
* 成功
*/
SUCCESS(1),
/**
* 告警(请求成功,但非理想值)
*/
WARNING(2),
/**
* 异常
*/
EXCEPTION(3),
/**
* 超时
*/
TIMEOUT(4);
private int value;
RpcStatus(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
enum Method {
/**
* Get请求
*/
GET,
/**
* Post表单请求
*/
POST,
}
/**
* 模板调用
*/
@FunctionalInterface
public interface CallTemplate {
/**
* 模板调用类
*/
String executeCall();
}
/**
* 报文解析
*/
@FunctionalInterface
public interface CallHandler<T> {
/**
* 报文解析
*
* @param resp
* response body
*/
T handle(String resp) throws IOException;
/**
* 校验数据体
*
* @param t
* @return
*/
default boolean check(T t) {
return true;
}
}
}
package com.netease.mail.yanxuan.change.integration.rpc;
import com.netease.mail.yanxuan.change.integration.email.exception.RpcException;
public class RpcTimeoutException extends RpcException {
public RpcTimeoutException(String message) {
super(message);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment