[java][db]JAVA分布式事务原理及应用
时间:2022-03-10 17:11
JTA(Java Transaction API)允许应用程序执行分布式事务处理--在两个或多个网络计算机资源上访问并且更新数据。JDBC驱动程序的JTA支持极大地增强了数据访问能力。
1—UserTransaction—javax.transaction.UserTransaction接口提供能够编程地控制事务处理范围的应用程序。 javax.transaction.UserTransaction方法开启一个全局事务并且使用调用线程与事务处理关联。
2—Transaction Manager—javax.transaction.TransactionManager接口允许应用程序服务器来控制代表正在管理的应用程序的事务范围。
3—XAResource—javax.transaction.xa.XAResource接口是一个基于X/Open CAE Specification的行业标准XA接口的Java映射。
注意,一个限制性环节是通过JDBC驱动程序的XAResource接口的支持。JDBC驱动程序必须支持两个正常的JDBC交互作用:应用程序和/或应用程序服务器,而且以及JTA的XAResource部分。
编写应用程序水平代码的开发者不会关心分布式事务处理管理的细节。 这是分布式事务处理基本结构的工作—应用程序服务器、事务管理程序和JDBC驱动程序。应用程序代码中唯一的需要注意的就是当连接处于一个分布式事务范围内的时候,不应该调用一个会影响事务边界的方法。特别的是,一个应用程序不应该调用Connection方法commit、rollback和setAutoCommit(true),因为它们将破坏分布式事务的基本结构管理。
分布式事务处理
事务管理程序是分布式事务基本结构的基本组件;然而JDBC驱动程序和应用程序服务器组件应该具备下面的特征:
驱动程序应该实现JDBC 2.0应用程序接口,包括Optional Package接口XADataSource和XAConnection以及JTA接口XAResource。
应用程序服务器应该提供一个DataSource类,用来实现与分布式事务基本结的交互以及一个连接池模块(用于改善性能)。
分布式事务处理的第一步就是应用程序要发送一个事务请求到事务管理程序。虽然最后的commit/rollback决定把事务作为一个简单的逻辑单元来对待,但是仍然可能会包括许多事务分支。一个事务分支与一个到包含在分布式事务中的每个资源管理程序相关联。因此,到三个不同的关系数据库管理的请求需要三个事务分支。每个事务分支必须由本地资源管理程序提交或者返回。事务管理程序控制事务的边界,并且负责最后决定应该提交或者返回的全部事务。 这个决定由两个步骤组成,称为Two - Phase Commit Protocol。
在第一步骤中,事务管理程序轮询所有包含在分布式事务中的资源管理程序(关系数据库管理)来看看哪个可以准备提交。如果一个资源管理程序不能提交,它将不响应,并且把事务的特定部分返回,以便数据不被修改。
在第二步骤中,事务管理程序判断否定响应的资源管理程序中是否有能够返回整个事务的。如果没有否定响应的话,翻译管理程序提交整个事务并且返回结果到应用程序中。
开发事项管理程序代码的开发者必须与所有三个JTA接口有关:UserTransaction、TransactionManager和XAResource,这三个接口都被描述在
Sun JTA specification中。JDBC驱动程序开发者只需要关心XAResource接口。这个接口是允许一个资源管理程序参与事务的行业标准X/Open XA协议的Java映射。连接XAResource接口的驱动程序组件负责在事务管理程序和资源管理程序之间担任"翻译"的任务。下面的章节提供了XAResource调用的例子。
JDBC驱动程序和XAResource
为了简化XAResource的说明,这些例子说明了一个应用程序在不包含应用程序服务器和事项管理程序的情况下应该如何使用JTA。 基本上,这些例子中的应用程序也担任应用程序服务器和事项管理程序的任务。大部分的企业使用事务管理程序和应用程序服务器,因为它们能够比一个应用程序更能够高效地管理分布式事务。 然而遵循这些例子,一个应用程序开发者可以测试在JDBC驱动程序中的JTA支持的健壮性。而且有一些例子可能不是工作在某个特定的数据库上,这是因为关联在数据库上的一些内在的问题。
在使用JTA之前,你必须首先实现一个Xid类用来标识事务(在普通情况下这将由事务管理程序来处理)。Xid包含三个元素:formatID、gtrid(全局事务标识符)和bqual(分支修饰词标识符)。
formatID通常是零,这意味着你将使用OSI CCR(Open Systems Interconnection Commitment, Concurrency和Recovery 标准)来命名。如果你要是用另外一种格式,那么formatID应该大于零。-1值意味着Xid为无效。
gtrid和bqual可以包含64个字节二进制码来分别标识全局事务和分支事务。唯一的要求是gtrid和bqual必须是全局唯一的。此外,这可以通过使用指定在OSI CCR中的命名规则规范来完成。
下面的例子说明Xid的实现:
public class MyXid implements Xid
{
protected int formatId;
protected byte gtrid[];
protected byte bqual[];
public MyXid()
{
}
public MyXid(int formatId, byte gtrid[], byte bqual[])
{
this.formatId = formatId;
this.gtrid = gtrid;
this.bqual = bqual;
}
public int getFormatId()
{
return formatId;
}
public byte[] getBranchQualifier()
{
return bqual;
}
public byte[] getGlobalTransactionId()
{
return gtrid;
}
}
其次,你需要创建一个你要使用的数据库的数据源:
public DataSource getDataSource()
throws SQLException
{
SQLServerDataSource xaDS = new
com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource();
xaDS.setDataSourceName("SQLServer");
xaDS.setServerName("server");
xaDS.setPortNumber(1433);
xaDS.setSelectMethod("cursor");
return xaDS;
}
例1—这个例子是用“两步提交协议”来提交一个事务分支:
XADataSource xaDS;
XAConnection xaCon;
XAResource xaRes;
Xid xid;
Connection con;
Statement stmt;
int ret;
xaDS = getDataSource();
xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes = xaCon.getXAResource();
con = xaCon.getConnection();
stmt = con.createStatement();
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
try {
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid, false);
}
}
catch (XAException e) {
e.printStackTrace();
}
finally {
stmt.close();
con.close();
xaCon.close();
}
因为所有这些例子中的初始化代码相同或者非常相似,仅仅是一些重要的地方的代码由不同。
例2—这个例子,与例1相似,说明了一个返回过程:
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
}
例3—这个例子说明一个分布式事务分支如何中止,让相同的连接做本地事务处理,以及它们稍后该如何继续这个分支。 分布式事务的两步提交作用不影响本地事务。
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUSPEND);
∥这个更新在事务范围之外完成,所以它不受XA返回影响。
stmt.executeUpdate("insert into test_table2 values (111)");
xaRes.start(xid, XAResource.TMRESUME);
stmt.executeUpdate("insert into test_table values (200)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
}
例4—这个例子说明一个XA资源如何分担不同的事务。 创建了两个事务分支,但是它们不属于相同的分布式事务。 JTA允许XA资源在第一个分支上做一个两步提交,虽然这个资源仍然与第二个分支相关联。
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22});
xaRes.start(xid1, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table1 values (100)");
xaRes.end(xid1, XAResource.TMSUCCESS);
xaRes.start(xid2, XAResource.TMNOFLAGS);
ret = xaRes.prepare(xid1);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid2, false);
}
stmt.executeUpdate("insert into test_table2 values (200)");
xaRes.end(xid2, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid2);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid2);
}
例5—这个例子说明不同的连接上的事务分支如何连接成为一个单独的分支,如果它们连接到相同的资源管理程序。这个特点改善了分布式事务的效率,因为它减少了两步提交处理的数目。两个连接到数据库服务器上的XA将被创建。每个连接创建它自己的XA资源,正规的JDBC连接和语句。在第二个XA资源开始一个事务分支之前,它将察看是否使用和第一个XA资源使用的是同一个资源管理程序。如果这是实例,它将加入在第一个XA连接上创建的第一个分支,而不是创建一个新的分支。 稍后,这个事务分支使用XA资源来准备和提交。
xaDS = getDataSource();
xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes1 = xaCon1.getXAResource();
con1 = xaCon1.getConnection();
stmt1 = con1.createStatement();
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes1.start(xid1, XAResource.TMNOFLAGS);
stmt1.executeUpdate("insert into test_table1 values (100)");
xaRes1.end(xid, XAResource.TMSUCCESS);
xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes2 = xaCon1.getXAResource();
con2 = xaCon1.getConnection();
stmt2 = con1.createStatement();
if (xaRes2.isSameRM(xaRes1)) {
xaRes2.start(xid1, XAResource.TMJOIN);
stmt2.executeUpdate("insert into test_table2 values (100)");
xaRes2.end(xid1, XAResource.TMSUCCESS);
}
else {
xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03});
xaRes2.start(xid2, XAResource.TMNOFLAGS);
stmt2.executeUpdate("insert into test_table2 values (100)");
xaRes2.end(xid2, XAResource.TMSUCCESS);
ret = xaRes2.prepare(xid2);
if (ret == XAResource.XA_OK) {
xaRes2.commit(xid2, false);
}
}
ret = xaRes1.prepare(xid1);
if (ret == XAResource.XA_OK) {
xaRes1.commit(xid1, false);
}
例6—这个例子说明在错误恢复的阶段,如何恢复准备好的或者快要完成的事务分支。 它首先试图返回每个分支;如果它失败了,它尝试着让资源管理程序丢掉关于事务的消息。
MyXid[] xids;
xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN);
for (int i=0; xids!=null && i
try {
xaRes.rollback(xids[i]);
}
catch (XAException ex) {
try {
xaRes.forget(xids[i]);
}
catch (XAException ex1) {
System.out.println("rollback/forget failed: " + ex1.errorCode);
}
}
}
[java][db]JAVA分布式事务原理及应用,布布扣,bubuko.com