之前介绍过thrift使用, 但是只知道使用总是不够满足,这次我们接着之前的代码,来看下这个具体的实现和处理流程,在介绍之前先看下几个重要的接口
传输层 TTransport : 负责数据传输(大部分场景为负责网络传输)
协议层 TProtocol: 负责进行数据结构的序列化与反序列化(依赖TTransport 进行数据传输及读取)
处理层 TProcessor : 使用协议和传输接口进行具体处理处理
server初始化
再次看下Server的初始化启动代码
1 2 3 4 5 6 7 8 9 10 11 12
| TServerTransport serverTransport = new TServerSocket(12345);
UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl()); final TServer.Args serverArgs = new TServer.Args(serverTransport).processor(processor);
TServer server = new TSimpleServer(serverArgs); System.out.println("Starting the simple server..."); server.serve();
|
下面据此来进入源码,了解一下具体实现(本次使用的代码为 libthrift : 0.15.0)
网络传输初始化
先来看下传输层TServerTransport,对应的实现TServerSocket就是对平时我们使用的ServerSocket的包装,负责进行相关的网络传输
1 2 3 4
| public class TServerSocket extends TServerTransport { private ServerSocket serverSocket_ = null; }
|
相关参数构造
接着分析一下代码UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl());
,这是 UserService.Processor的构造过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
public abstract class TBaseProcessor<I> implements TProcessor { private final I iface; private final Map<String, ProcessFunction<I, ? extends TBase>> processMap;
@Override public void process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); ProcessFunction fn = processMap.get(msg.name); fn.process(msg.seqid, in, out, iface); } }
public static class Processor<I extends Iface> extends TBaseProcessor<I> implements TProcessor { public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, ProcessFunction<I, ? extends TBase>>())); }
private static <I extends Iface> Map<String, ProcessFunction<I, ? extends TBase>> getProcessMap(Map<String, ProcessFunction<I, ? extends TBase>> processMap) { processMap.put("searchUsers", new searchUsers()); return processMap; } }
|
再来看下 TServer.Args,它其实类似于我们平时使用的builder模式,用来构造 TServer使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static class Args extends AbstractServerArgs<Args> { public Args(TServerTransport transport) { super(transport); } }
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> { final TServerTransport serverTransport; TProcessorFactory processorFactory; TTransportFactory inputTransportFactory = new TTransportFactory(); TTransportFactory outputTransportFactory = new TTransportFactory(); TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory(); TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory(); }
|
创建server实例启动
最后启动server,这次我们只看最简单的 TSimpleServer 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public void serve() { while (!stopped_) { TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; ServerContext connectionContext = null; try { client = serverTransport_.accept(); if (client != null) { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); while (true) { processor.process(inputProtocol, outputProtocol); } } } catch (TTransportException ttx) { } } }
|
client初始化及调用
相关代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| TTransport transport = new TSocket("localhost", 12345); transport.open();
TProtocol protocol = new TBinaryProtocol(transport); UserService.Client client = new UserService.Client(protocol);
UserSearchResult userRes = client.searchUsers("zhangsan"); System.out.println(userRes);
transport.close();
|
下面我们来依次分析一下
初始化网络连接
1 2
| TTransport transport = new TSocket("localhost", 12345); transport.open();
|
TSocket是对java Socket的包装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void open() throws TTransportException { if (socket_ == null) { initSocket(); }
try { socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_); inputStream_ = new BufferedInputStream(socket_.getInputStream()); outputStream_ = new BufferedOutputStream(socket_.getOutputStream()); } catch (IOException iox) { close(); throw new TTransportException(TTransportException.NOT_OPEN, iox); } }
|
构造客户端及调用
1 2 3 4
| TProtocol protocol = new TBinaryProtocol(transport); UserService.Client client = new UserService.Client(protocol);
UserSearchResult userRes = client.searchUsers("zhangsan");
|
UserService.Client也是thrift自动生成的代码,看下searchUsers
方法调用的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static class Client extends org.apache.thrift.TServiceClient implements Iface { public UserSearchResult searchUsers(String name) throws TException { send_searchUsers(name); return recv_searchUsers(); } public void send_searchUsers(String name) throws TException { searchUsers_args args = new searchUsers_args(); args.setName(name); sendBase("searchUsers", args); } public UserSearchResult recv_searchUsers() throws TException { searchUsers_result result = new searchUsers_result(); receiveBase(result, "searchUsers"); if (result.isSetSuccess()) { return result.success; } throw new TApplicationException(MISSING_RESULT, "searchUsers failed: unknown result"); } }
|
thrift会为我们定义的结构和属性生成相关的类,包含我们定义的参数信息也会有对应的类,每个生成的类型会两个方法,read 和 write 来使用 TProtocol 来实现序列化和传输数据
这里以searchUsers_args(参数对应类)看一下相关的生成代码,代码量比较多,我们只看下核心部分实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
public static class searchUsers_args implements TBase<UserService.searchUsers_args, UserService.searchUsers_args._Fields>, Serializable, Cloneable, Comparable<UserService.searchUsers_args> { private static final TStruct STRUCT_DESC = new TStruct("searchUsers_args"); private static final TField NAME_FIELD_DESC = new TField("name", TType.STRING, (short)1);
private static final SchemeFactory STANDARD_SCHEME_FACTORY = new searchUsers_argsStandardSchemeFactory();
public String name;
public enum _Fields implements org.apache.thrift.TFieldIdEnum { NAME((short)1, "name");
private static final Map<String, UserService.searchUsers_args._Fields> byName = new HashMap<>();
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { scheme(iprot).read(iprot, this); }
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); }
private static class searchUsers_argsStandardSchemeFactory implements SchemeFactory { public UserService.searchUsers_args.searchUsers_argsStandardScheme getScheme() { return new UserService.searchUsers_args.searchUsers_argsStandardScheme(); } }
private static class searchUsers_argsStandardScheme extends StandardScheme<UserService.searchUsers_args> {
public void read(TProtocol iprot, UserService.searchUsers_args struct) throws TException { TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); if (schemeField.type == TType.STOP) { break; } switch (schemeField.id) { case 1: if (schemeField.type == TType.STRING) { struct.name = iprot.readString(); struct.setNameIsSet(true); } else { TProtocolUtil.skip(iprot, schemeField.type); } break; default: TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); }
public void write(TProtocol oprot, UserService.searchUsers_args struct) throws TException { oprot.writeStructBegin(STRUCT_DESC); if (struct.name != null) { oprot.writeFieldBegin(NAME_FIELD_DESC); oprot.writeString(struct.name); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); }
}
private static <S extends IScheme> S scheme(TProtocol proto) { return (StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); } }
|
构造参数完成后,调用sendBase进行数据发送,sendBase为父类TServiceClient中的方法
消息接收则使用对应的receiveBase方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public abstract class TServiceClient { protected void sendBase(String methodName, TBase<?,?> args) throws TException { sendBase(methodName, args, TMessageType.CALL); } private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException { oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); } protected void receiveBase(TBase<?,?> result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); if (msg.type == TMessageType.EXCEPTION) { TApplicationException x = new TApplicationException(); x.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid)); } result.read(iprot_); iprot_.readMessageEnd(); } }
|
server接收请求处理及响应
之前server部分看到处理化等待连接,这里直接从接收到请求处理逻辑开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); ProcessFunction fn = processMap.get(msg.name); if (fn == null) { TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); } else { fn.process(msg.seqid, in, out, iface); } }
|
这里我们关注下ProcessFunction
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public abstract class ProcessFunction<I, T extends TBase> { private final String methodName;
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException { T args = getEmptyArgsInstance(); try { args.read(iprot); } catch (TProtocolException e) { } iprot.readMessageEnd(); TSerializable result = null; byte msgType = TMessageType.REPLY;
try { result = getResult(iface, args); } catch (TTransportException ex) { }
if(!isOneway()) { oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid)); result.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); } }
protected boolean rethrowUnhandledExceptions(){ return false; }
protected abstract boolean isOneway();
public abstract TBase getResult(I iface, T args) throws TException;
public abstract T getEmptyArgsInstance();
public String getMethodName() { return methodName; } }
|
对于上面例子,对应的ProcessFunction子类为 searchUsers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static class searchUsers<I extends UserService.Iface> extends ProcessFunction<I, UserService.searchUsers_args> { public searchUsers() { super("searchUsers"); }
public UserService.searchUsers_args getEmptyArgsInstance() { return new UserService.searchUsers_args(); }
protected boolean isOneway() { return false; }
@Override protected boolean rethrowUnhandledExceptions() { return false; }
public UserService.searchUsers_result getResult(I iface, UserService.searchUsers_args args) throws TException { UserService.searchUsers_result result = new UserService.searchUsers_result(); result.success = iface.searchUsers(args.name); return result; } }
|
以上即为简单的一次thrift调用和响应的实现逻辑流程
简单总结一下就是thrift会为每种类型生成对应的一个结构,包括如果参数为多个字段也会合并生成一个结构TBase,其中会使用TProtocol(其中会使用TTransport进行消息发送)进行对应结构数据的写入和读取
每次消息会被包装成一个TMessage,其中会包括方法名、消息类型及递增的一个序号(收到响应时用来进行对应)