外观
外观
2740字约9分钟
2024-05-18
首先无论是请求或响应,都会涉及参数的传输。而Java 对象是存活在 JVM 虚拟机中的,如果想在其他位置存储并访问、或者在网络中进行传输,就需要进行序列化和反序列化。
此前已经编写了通用的序列化器接口,并实现基于 Java 原生序列化的序列化器,但仍需考虑三个问题:
目前市面上还有很多种主流的序列化方式,如:JSON、Hessian、Kryo、protobuf 等。
主流序列化方式对比
a)JSON
优点:
缺点:
b)Hessian:
[https://hessian.caucho.com/]:
优点:
缺点:
c)Kryo
优点:
缺点:
d)Protobuf
优点:
缺点:
定义一个 序列化器名称 => 序列化器实现类对象
的 Map,然后根据名称从 Map 中获取对象即可。
使用 SPI 机制。
什么是 SPI?
SPI(Service Provider Interface)服务提供接口是 Java 的机制,主要用于实现模块化开发和插件化扩展。
SPI 机制允许服务提供者通过特定的配置文件将自己的实现注册到系统中,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码,从而实现了系统的解耦、提高了可扩展性。
一个典型的 SPI 应用场景是 JDBC(Java 数据库连接库),不同的数据库驱动程序开发者可以使用 JDBC 库,然后定制自己的数据库驱动程序。
此外,我们使用的主流 Java 开发框架中,几乎都使用到了 SPI 机制,比如 Servlet 容器、日志框架、ORM 框架、Spring 框架。
a)首先给项目的 pom.xml 中引入依赖:
<!-- 序列化 -->
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.0</version>
</dependency>
b)然后在序列化器包 serializer
中分别实现这三种序列化器;
JSON序列化器
JSON 序列化器的实现相对复杂,要考虑一些对象转换的兼容性问题,比如 Object 数组在序列化后会丢失类型。
public class JsonSerializer implements Serializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public <T> byte[] serialize(T obj) throws IOException {
return OBJECT_MAPPER.writeValueAsBytes(obj);
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
T obj = OBJECT_MAPPER.readValue(bytes, classType);
if (obj instanceof RpcRequest) {
return handleRequest((RpcRequest) obj, classType);
}
if (obj instanceof RpcResponse) {
return handleResponse((RpcResponse) obj, classType);
}
return obj;
}
/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcRequest rpc 请求
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] args = rpcRequest.getArgs();
// 循环处理每个参数的类型
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> clazz = parameterTypes[i];
// 如果类型不同,则重新处理一下类型
if (!clazz.isAssignableFrom(args[i].getClass())) {
byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
}
}
return type.cast(rpcRequest);
}
/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcResponse rpc 响应
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
// 处理响应数据
byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
return type.cast(rpcResponse);
}
}
Kryo 序列化器
Kryo 本身是线程不安全的,所以需要使用 ThreadLocal 保证每个线程有一个单独的 Kryo 对象实例。
public class KryoSerializer implements Serializer {
/**
* kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
*/
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
// 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
kryo.setRegistrationRequired(false);
return kryo;
});
@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
KRYO_THREAD_LOCAL.get().writeObject(output, obj);
output.close();
return byteArrayOutputStream.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
input.close();
return result;
}
}
Hessian 序列化器
public class HessianSerializer implements Serializer {
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(bos);
ho.writeObject(object);
return bos.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(bis);
return (T) hi.readObject(tClass);
}
}
a)首先定义序列化器名称的常量,使用接口实现。
public interface SerializerKeys {
String JDK = "jdk";
String JSON = "json";
String KRYO = "kryo";
String HESSIAN = "hessian";
}
b)定义序列化器工厂。
序列化器对象是可以复用的,没必要每次执行序列化操作前都创建一个新的对象。所以我们可以使用设计模式中的 工厂模式 + 单例模式 来简化创建和获取序列化器对象的操作。
序列化器工厂代码如下,使用 Map 来维护序列化器实例:
public class SerializerFactory {
/**
* 序列化映射(用于实现单例)
*/
private static final Map<String, Serializer> KEY_SERIALIZER_MAP = new HashMap<String, Serializer>() {{
put(SerializerKeys.JDK, new JdkSerializer());
put(SerializerKeys.JSON, new JsonSerializer());
put(SerializerKeys.KRYO, new KryoSerializer());
put(SerializerKeys.HESSIAN, new HessianSerializer());
}};
/**
* 默认序列化器
*/
private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get("jdk");
/**
* 获取实例
*
* @param key
* @return
*/
public static Serializer getInstance(String key) {
return KEY_SERIALIZER_MAP.getOrDefault(key, DEFAULT_SERIALIZER);
}
}
c)在全局配置类 RpcConfig 中补充序列化器的配置
/**
* 序列化器
*/
private String serializer = SerializerKeys.JDK;
d)动态获取序列化器。
需要将之前代码中所有用到序列化器的位置更改为 “使用工厂 + 读取配置” 来获取实现类。
要更改的类:
使用自定义的 SPI 机制实现,支持用户自定义序列化器并指定键名。
a)指定 SPI 配置目录。
系统内置的 SPI 机制会加载 resources
资源目录下的 META-INF/services
目录,那自定义的序列化器可以如法炮制,改为读取 META-INF/rpc
目录。
另外还可以将 SPI 配置再分为系统内置 SPI 和用户自定义 SPI,即目录如下:
JdkSerializer
。 这样一来,所有接口的实现类都可以通过 SPI 动态加载,不用在代码中硬编码 Map 来维护实现类了。
在静态资源包里添加一个系统扩展配置文件,内容为之前写好的序列化器。
jdk=com.honghu.corerpc.serializer.JdkSerializer
hessian=com.honghu.corerpc.serializer.HessianSerializer
json=com.honghu.corerpc.serializer.JsonSerializer
kryo=com.honghu.corerpc.serializer.KryoSerializer
2)编写 SpiLoader 加载器。
相当于一个工具类,提供了读取配置并加载实现类的方法。
关键实现如下:
键名 => 实现类
。键名 => 实现类
信息并存储在 Map 中。@Slf4j
public class SpiLoader {
/**
* 加载已经加载的类:接口名 => (key => 实现类)
*/
private static Map<String,Map<String,Class<?>>> loaderMap = new ConcurrentHashMap<>();
/**
* 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
*/
private static Map<String,Object> instanceCache = new ConcurrentHashMap<>();
/**
* 系统 SPI 目录
*/
private static final String RPC_SYSTEM_SPI_DIR = "META-INF/rpc/system/";
/**
* 用户 SPI 目录
*/
private static final String RPC_CUSTOM_SPI_DIR = "META-INF/rpc/custom/";
/**
* 扫描路径
*/
private static final String[] SCAN_DIRS = new String[]{RPC_CUSTOM_SPI_DIR,RPC_SYSTEM_SPI_DIR};
/**
* 动态加载的类列表
*/
private static final List<Class<?>> LOAD_CLASS_LIST = Arrays.asList(Serializer.class);
/**
* 加载所有类型
*/
public static void loadAll(){
log.info("加载所有 SPI");
for(Class<?> aClass : LOAD_CLASS_LIST){
load(aClass);
}
}
public static <T> T getInstance(Class<?> tClass,String key){
String tClassName = tClass.getName();
Map<String,Class<?>> keyClassMap = loaderMap.get(tClassName);
if(keyClassMap == null){
throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 key=%s 的类型",tClassName,key));
}
if(!keyClassMap.containsKey(key)){
throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 key=%s 的类型",tClassName,key));
}
//获取要实现的加载类型
Class<?> implClass = keyClassMap.get(key);
String implClassName = implClass.getName();
if(!instanceCache.containsKey(implClassName)){
try {
instanceCache.put(implClassName,implClass.newInstance());
}catch (InstantiationException | IllegalAccessException e){
String errorMsg = String.format("%s 类实例化失败",implClassName);
throw new RuntimeException(errorMsg,e);
}
}
return (T)instanceCache.get(implClassName);
}
/**
* 加载某个类型
*
* @param loadClass
* @return
*/
public static Map<String,Class<?>> load(Class<?> loadClass) {
log.info("加载类型为 {} 的 SPI",loadClass.getName());
//扫描路径,用户自定义的 SPI 优先级高于系统 SPI
Map<String,Class<?>> keyClassMap = new HashMap<>();
for (String scanDir : SCAN_DIRS){
List<URL> resources = ResourceUtil.getResources(scanDir + loadClass.getName());
//读取每个资源文件
for (URL resource : resources) {
try{
InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line;
while ((line = bufferedReader.readLine()) != null){
String[] strArray = line.split("=");
if(strArray.length > 1){
String key = strArray[0];
String className = strArray[1];
keyClassMap.put(key,Class.forName(className));
}
}
}catch (Exception e){
log.info("SPI resource load error",e);
}
}
}
loaderMap.put(loadClass.getName(),keyClassMap);
return keyClassMap;
}
}
3)重构序列化器工厂。
之前是通过在工厂中硬编码 HashMap 来存储序列化器和实现类的,有了 SPI 后,就可以改为从 SPI 加载指定的序列化器对象。
public class SerializerFactory {
static {
SpiLoader.load(Serializer.class);
}
/**
* 默认序列化器
*/
private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();
/**
* 获取实例
*
* @param key
* @return
*/
public static Serializer getInstance(String key) {
return SpiLoader.getInstance(Serializer.class, key);
}
}