Mongoose 中的交易
交易讓您可以在隔離狀態下執行多個操作,並且如果其中一個操作失敗,可能會取消所有操作。本指南將引導您開始在 Mongoose 中使用交易。
交易入門
如果您尚未匯入 mongoose,請先匯入
import mongoose from 'mongoose';
要建立交易,您首先需要使用 Mongoose#startSession
或 Connection#startSession()
建立一個會話。
// Using Mongoose's default connection
const session = await mongoose.startSession();
// Using custom connection
const db = await mongoose.createConnection(mongodbUri).asPromise();
const session = await db.startSession();
在實務上,您應該使用 session.withTransaction()
輔助函式或 Mongoose 的 Connection#transaction()
函式來執行交易。session.withTransaction()
輔助函式會處理
- 建立交易
- 如果成功,則提交交易
- 如果您的操作拋出異常,則中止交易
- 在發生暫時性交易錯誤時重試。
let session = null;
return Customer.createCollection().
then(() => Customer.startSession()).
// The `withTransaction()` function's first parameter is a function
// that returns a promise.
then(_session => {
session = _session;
return session.withTransaction(() => {
return Customer.create([{ name: 'Test' }], { session: session });
});
}).
then(() => Customer.countDocuments()).
then(count => assert.strictEqual(count, 1)).
then(() => session.endSession());
有關 ClientSession#withTransaction()
函式的更多資訊,請參閱MongoDB Node.js 驅動程式文件。
Mongoose 的 Connection#transaction()
函式是 withTransaction()
的包裝函式,它將 Mongoose 的變更追蹤與交易整合在一起。例如,假設您在稍後失敗的交易中 save()
文件。該文件中的變更不會持續到 MongoDB。Connection#transaction()
函式會通知 Mongoose 變更追蹤,save()
已回滾,並將交易中變更的所有欄位標記為已修改。
const doc = new Person({ name: 'Will Riker' });
await db.transaction(async function setRank(session) {
doc.name = 'Captain';
await doc.save({ session });
doc.isNew; // false
// Throw an error to abort the transaction
throw new Error('Oops!');
}, { readPreference: 'primary' }).catch(() => {});
// true, `transaction()` reset the document's state because the
// transaction was aborted.
doc.isNew;
關於交易中平行處理的注意事項
在交易期間不支援平行執行操作。使用 Promise.all
、Promise.allSettled
、Promise.race
等來平行化交易內的操作是未定義的行為,應避免使用。
使用 Mongoose 文件和 save()
如果您使用會話從 findOne()
或 find()
取得 Mongoose 文件,該文件將保留對該會話的引用,並將該會話用於 save()
。
要取得/設定與給定文件關聯的會話,請使用 doc.$session()
。
const User = db.model('User', new Schema({ name: String }));
let session = null;
return User.createCollection().
then(() => db.startSession()).
then(_session => {
session = _session;
return User.create({ name: 'foo' });
}).
then(() => {
session.startTransaction();
return User.findOne({ name: 'foo' }).session(session);
}).
then(user => {
// Getter/setter for the session associated with this document.
assert.ok(user.$session());
user.name = 'bar';
// By default, `save()` uses the associated session
return user.save();
}).
then(() => User.findOne({ name: 'bar' })).
// Won't find the doc because `save()` is part of an uncommitted transaction
then(doc => assert.ok(!doc)).
then(() => session.commitTransaction()).
then(() => session.endSession()).
then(() => User.findOne({ name: 'bar' })).
then(doc => assert.ok(doc));
使用聚合框架
Model.aggregate()
函式也支援交易。Mongoose 聚合有一個 session()
輔助函式,用於設定 session
選項。以下是在交易中執行聚合的範例。
const Event = db.model('Event', new Schema({ createdAt: Date }), 'Event');
let session = null;
return Event.createCollection().
then(() => db.startSession()).
then(_session => {
session = _session;
session.startTransaction();
return Event.insertMany([
{ createdAt: new Date('2018-06-01') },
{ createdAt: new Date('2018-06-02') },
{ createdAt: new Date('2017-06-01') },
{ createdAt: new Date('2017-05-31') }
], { session: session });
}).
then(() => Event.aggregate([
{
$group: {
_id: {
month: { $month: '$createdAt' },
year: { $year: '$createdAt' }
},
count: { $sum: 1 }
}
},
{ $sort: { count: -1, '_id.year': -1, '_id.month': -1 } }
]).session(session)).
then(res => assert.deepEqual(res, [
{ _id: { month: 6, year: 2018 }, count: 2 },
{ _id: { month: 6, year: 2017 }, count: 1 },
{ _id: { month: 5, year: 2017 }, count: 1 }
])).
then(() => session.commitTransaction()).
then(() => session.endSession());
使用 AsyncLocalStorage
Mongoose 中交易的一個主要痛點是您需要記住在每個操作上設定 session
選項。如果您不這樣做,您的操作將在交易之外執行。Mongoose 8.4 能夠使用 Node 的 AsyncLocalStorage API 在 Connection.prototype.transaction()
執行器函式中的所有操作上設定 session
操作。使用 mongoose.set('transactionAsyncLocalStorage', true)
設定 transactionAsyncLocalStorage
選項以啟用此功能。
mongoose.set('transactionAsyncLocalStorage', true);
const Test = mongoose.model('Test', mongoose.Schema({ name: String }));
const doc = new Test({ name: 'test' });
// Save a new doc in a transaction that aborts
await connection.transaction(async() => {
await doc.save(); // Notice no session here
throw new Error('Oops');
}).catch(() => {});
// false, `save()` was rolled back
await Test.exists({ _id: doc._id });
使用 transactionAsyncLocalStorage
,您不再需要將會話傳遞給每個操作。Mongoose 將在後台預設新增會話。
進階用法
想要更精細地控制提交或中止交易時間的進階使用者可以使用 session.startTransaction()
來開始交易
const Customer = db.model('Customer', new Schema({ name: String }));
let session = null;
return Customer.createCollection().
then(() => db.startSession()).
then(_session => {
session = _session;
// Start a transaction
session.startTransaction();
// This `create()` is part of the transaction because of the `session`
// option.
return Customer.create([{ name: 'Test' }], { session: session });
}).
// Transactions execute in isolation, so unless you pass a `session`
// to `findOne()` you won't see the document until the transaction
// is committed.
then(() => Customer.findOne({ name: 'Test' })).
then(doc => assert.ok(!doc)).
// This `findOne()` will return the doc, because passing the `session`
// means this `findOne()` will run as part of the transaction.
then(() => Customer.findOne({ name: 'Test' }).session(session)).
then(doc => assert.ok(doc)).
// Once the transaction is committed, the write operation becomes
// visible outside of the transaction.
then(() => session.commitTransaction()).
then(() => Customer.findOne({ name: 'Test' })).
then(doc => assert.ok(doc)).
then(() => session.endSession());
您也可以使用 session.abortTransaction()
來中止交易
let session = null;
return Customer.createCollection().
then(() => Customer.startSession()).
then(_session => {
session = _session;
session.startTransaction();
return Customer.create([{ name: 'Test' }], { session: session });
}).
then(() => Customer.create([{ name: 'Test2' }], { session: session })).
then(() => session.abortTransaction()).
then(() => Customer.countDocuments()).
then(count => assert.strictEqual(count, 0)).
then(() => session.endSession());