当前位置:首页 >> 脚本专栏

使用Python的Django框架实现事务交易管理的教程

 如果你花费了很多的时间去进行Django数据库事务处理的话,你将会了解到这是让人晕头转向的。

在过去,只是提供了简单的基础文档,要想清楚知道它是怎么使用的,还必须要通过创建和执行Django的事务处理。

这里有众多的Django事务处理的名词,例如:commit_on_success , commit_manually , commit_unless_maneged,rollback_unless_managed,enter_transaction_management,leace_transaction_management,这些只是其中的一部分而已。

最让人感到幸运的是,Django 1.6版本发布后。现在你可以紧紧使用几个函数就可以实现事务处理了,在我们进入学习这些函数的几秒之前。首先,我们要明确下面这些问题:

  •     什么是事务处理呢?
  •     Django 1.6中的出现了那些新的事物处理优先权"一个SQL事务(有时简称为事务)是一个可以原子性恢复的SQL语句执行序列"。换句话说,所有的SQL语句一起执行和提交。同样的,当回滚时,所有语句也一并回滚。

    例如:
     

    # START
    note = Note(title="my first note", text="Yay!")
    note = Note(title="my second note", text="Whee!")
    address1.save()
    address2.save()
    # COMMIT
    

    所以一个事务是 数据库中单个的工作单元。它是由一个start transaction开始和一个commit或者显式的rollback结束。

    Django 1.6之前的事务管理有什么问题"htmlcode">

    def register(request):
      user = None
      if request.method == 'POST':
        form = UserForm(request.POST)
        if form.is_valid():
     
          customer = Customer.create("subscription",
           email = form.cleaned_data['email'],
           description = form.cleaned_data['name'],
           card = form.cleaned_data['stripe_token'],
           plan="gold",
          )
     
          cd = form.cleaned_data      
          try:
            user = User.create(cd['name'], cd['email'], cd['password'],
              cd['last_4_digits'])
     
            if customer:
              user.stripe_id = customer.id
              user.save()
            else:
              UnpaidUsers(email=cd['email']).save()
     
          except IntegrityError:
            form.addError(cd['email'] + ' is already a member')
          else:
            request.session['user'] = user.pk
            return HttpResponseRedirect('/')
     
      else:
       form = UserForm()
     
      return render_to_response(
        'register.html',
        {
         'form': form,
         'months': range(1, 12),
         'publishable': settings.STRIPE_PUBLISHABLE,
         'soon': soon(),
         'user': user,
         'years': range(2011, 2036),
        },
        context_instance=RequestContext(request)
      )
    

    例子首先调用了Customer.create,实际上就是调用Stripe来处理信用卡进程,然后我们创建一个新用户。如果我们得到来自Stripe的响应,我们就用stripe_id更新新创建的用户。如果我们没有得到响应(Stripe已关闭),我们将用新创建用户的email向UnpaidUsers表增加一个新条目,这样我们可以让他们稍后重试他们信用卡信息。


    思路是这样的:如果Stripe没有响应,用户依然可以注册,然后开始使用我们的网站。我们将在稍后的时候让用户提供信用卡的信息。

    “我明白这是一个特殊的例子,并且这也不是我想完成的功能的方式,但是它的目的是展示交易”

    考虑交易,牢记住在Django1.6中提供了对于数据库的“AUTOCOMMIT”功能。接下来看一下数据库相关的代码:
     

    cd = form.cleaned_data
    try:
      user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
     
      if customer:
        user.stripe_id = customer.id
        user.save()
      else:
        UnpaidUsers(email=cd['email']).save()
     
    except IntegrityError:
    

    你能发现问题了吗?如果“UnpaidUsers(email=cd['email']).save()” 运行失败,会发生什么?

    有一个用户,注册了系统;然后系统认为已经核对过信用卡了。但是事实上,系统并没有核对过。

    我们仅仅想得到其中一种结果:

    1.在数据库中创建了用户,并有了stripe_id

    2.在数据库中创建了用户,但是没有stripe_id。同时在相关的“UnpaidUsers”行,存有相同的邮件地址

    这就意味着,我们想要的是分开的数据库语句头完成任务或者回滚。这个例子很好的说明了这个交易。


    首先,我们写一些测试用例来验证事情是否按照我们想象的方式运行: 
     

    @mock.patch('payments.models.UnpaidUsers.save', side_effect = IntegrityError)
    def test_registering_user_when_strip_is_down_all_or_nothing(self, save_mock):
     
      #create the request used to test the view
      self.request.session = {}
      self.request.method='POST'
      self.request.POST = {'email' : 'python@rocks.com',
                 'name' : 'pyRock',
                 'stripe_token' : '...',
                 'last_4_digits' : '4242',
                 'password' : 'bad_password',
                 'ver_password' : 'bad_password',
                }    
     
      #mock out stripe and ask it to throw a connection error
      with mock.patch('stripe.Customer.create', side_effect =
              socket.error("can't connect to stripe")) as stripe_mock:
     
        #run the test
        resp = register(self.request)
     
        #assert there is no record in the database without stripe id.
        users = User.objects.filter(email="python@rocks.com")
        self.assertEquals(len(users), 0)
     
        #check the associated table also didn't get updated
        unpaid = UnpaidUsers.objects.filter(email="python@rocks.com")
        self.assertEquals(len(unpaid), 0)
    

    当我们尝试去保存“UnpaidUsers”,测试上方的解释器就会跑出异常'IntegrityError' 。

    接下来是解释这个问题的答案,“当“UnpaidUsers(email=cd['email']).save()”运行的时候到底发生了什么?” 下面一段代码创建了一段对话,我们需要在注册函数中给出一些合适的信息。然后“with mock.patch” 会强制系统去认为Stripe没响应,最终就跳到我们的测试用例中。

    resp = register(self.request)
    
    

    上面这段话仅仅是调用我们的注册视图去传递请求。然后我们仅仅需要去核对表是否有更新:
     

    #assert there is no record in the database without stripe_id.
    users = User.objects.filter(email="python@rocks.com")
    self.assertEquals(len(users), 0)
     
    #check the associated table also didn't get updated
    unpaid = UnpaidUsers.objects.filter(email="python@rocks.com")
    self.assertEquals(len(unpaid), 0)
    

    所以如果我们运行了测试用例,那么它就该运行失败:
     

    ======================================================================
    FAIL: test_registering_user_when_strip_is_down_all_or_nothing (tests.payments.testViews.RegisterPageTests)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
     File "/Users/j1z0/.virtualenvs/django_1.6/lib/python2.7/site-packages/mock.py", line 1201, in patched
      return func(*args, **keywargs)
     File "/Users/j1z0/Code/RealPython/mvp_for_Adv_Python_Web_Book/tests/payments/testViews.py", line 266, in test_registering_user_when_strip_is_down_all_or_nothing
      self.assertEquals(len(users), 0)
    AssertionError: 1 != 0
     
    ----------------------------------------------------------------------
    

    赞。这就是我们最终想要的结果。

    记住:我们这里已经练习了“测试驱动开发”的能力。错误信息提示我们:用户信息已经被保存到数据库中,但是这个并不是我们想要的,因为我们并没有付费!

    事务交易用于挽救这样问题 ...

    事务

    对于Django1.6,有很多种方式来创建事务。

    这里简单介绍几种。
    推荐的方法

    依据Django1.6的文档,“Django提供了一种简单的API去控制数据库的事务交易...原子操作用来定义数据库事务的属性。原子操作允许我们在数据库保证的前提下,创建一堆代码。如果这些代码被成功的执行,所对应的改变也会提交到数据库中。如果有异常发生,那么操作就会回滚。”

    原子操作可以被用于解释操作或者是内容管理。所以如果我们用作为内容管理的时候,我们的注册函数的代码就会如下:
     

    from django.db import transaction
     
    try:
      with transaction.atomic():
        user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
     
        if customer:
          user.stripe_id = customer.id
          user.save()
        else:
          UnpaidUsers(email=cd['email']).save()
     
    except IntegrityError:
      form.addError(cd['email'] + ' is already a member')
    

    注意在“with transaction.atomic()”这一行。这块代码将会在事务内部执行。所以如果我们重新运行了我们的测试,他们都将会通过。

    记住:事务是一个工作单元,所以当“UnpaidUsers”调用失败的时候,内容管理的所有操作都会被一起回滚。

    使用装饰器

    除了上面的做法,我们能使用Python的装饰器特性来使用事务。
     

    @transaction.atomic():
    def register(request):
      ...snip....
     
      try:
        user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
     
        if customer:
          user.stripe_id = customer.id
          user.save()
        else:
            UnpaidUsers(email=cd['email']).save()
     
      except IntegrityError:
        form.addError(cd['email'] + ' is already a member')
    

    如果我们重新跑一次测试,那还是会一样失败。

    为啥呢?为啥事务没有正确回滚呢?原因在与transaction.atomic会尝试捕获某种异常,而我们代码里人肉捕抓了(例如 try-except 代码块里的IntegrityError 异常),所以 transaction.atomic 永远看不到这个异常,所以标准的AUTOCOMMIT 流程就此无效掉。

    但是,删掉try-catch语句会导致异常没有捕获,然后代码流程十有八九会就此乱掉。所以啊,也就是说不能去掉try-catch。
     


    所以,技巧是将原子上下文管理器放入我们在第一个解决方案中的 try-catch 代码段里。

    再看一下正确的代码:
     

    from django.db import transaction
     
    try:
      with transaction.atomic():
        user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
     
        if customer:
          user.stripe_id = customer.id
          user.save()
        else:
          UnpaidUsers(email=cd['email']).save()
     
    except IntegrityError:
      form.addError(cd['email'] + ' is already a member')
    

    当 UnpaidUsers 触发 IntegrityError 时,上下文管理器 transaction.atomic() 会捕获到它,并执行回滚操作。此时我们的代码在异常处理中执行(即行 theform.addErrorline),将会完成回滚操作,并且,如果必要的话,也可以安全的进行数据库调用。也要注意:任何在上下文管理器 thetransaction.atomic() 前后的数据库调用都不会受到它的执行结果的影响。

    针对每次HTTP请求的事务交易

    Django1.5和1.6版本都允许用户操作请求事务模式。在这种模式下,Django会自动在事务中,处理你的视图函数。如果视图函数抛出异常,Django会自动回滚事务;否则Django会提交事务。

    为了实现这个功能,你需要在你想要有此功能的数据库的配置中,设置“ATOMIC_REQUEST”为真。所以在我们的“settings.py”需要有如下设置:
     

    DATABASES = {
      'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(SITE_ROOT, 'test.db'),
        'ATOMIC_REQUEST': True,
      }
    }
    

    如果我们把解释器放到视图函数中,以上设置就会生效。所以这并没有符合我们的想法。

    但是,这里依然值得注意的是解释器:“ATOMIC_REQUESTS”和“@transaction.atomic”仍然会有可能在有异常抛出的时候,处理这些错误。为了去捕捉这些错误,你需要去完成一些常规的中间件,或者需要去覆盖“urls.hadler500”,或者是创建新的“500.html”模板。

    保存点

    尽管事务是有原子性的,但还是能够打散为多个“保存点”——你可看作是“部分事务”。

    例如,你有个事务包含了4条SQL语句,你可以在第二个SQL之后创建一个保存点。一旦保存点创建成功,就算第三条或第四条SQL执行失败,你仍旧能够做一个部分回滚,忽视后面两条SQL,仅保留前面两条。

    基本上,这就像是提供了一个切割的能力:一个普通的事务能够被切分为多个更“轻量”的事务,然后能进行部分回滚或部分提交。

        但一定要注意,小心整个事务被无端回滚掉(例如由于抛出了IntegrityError异常但却没有捕抓,那所有的保存点都会因此被回滚掉的)

    来看个示例代码,了解怎么玩转保存点。

     

    @transaction.atomic()
    def save_points(self,save=True):
     
      user = User.create('jj','inception','jj','1234')
      sp1 = transaction.savepoint()
     
      user.name = 'zheli hui guadiao, T.T'
      user.stripe_id = 4
      user.save()
     
      if save:
        transaction.savepoint_commit(sp1)
      else:
        transaction.savepoint_rollback(sp1)
    

    示例中,整个函数都是属于一个事务的。新User对象创建后,我们创建并得到了一个保存点。然后后续3行语句——
     

      user.name = 'zheli hui guadiao, T.T'
      user.stripe_id = 4
      user.save()
    

    ——不属于刚才的保存点,因此他们有可能是属于下面的savepoint_rollback或savepoint_commit的一部分。假设是savepoint_rollback, 那代码行user = User.create('jj','inception','jj','1234')仍旧会成功提交到数据库中 ,而下面三行则不会成功。
     

    采用另外一种方法,下面的两种测试用例描述了保存点是如何工作的:

     

    def test_savepoint_rollbacks(self):
     
      self.save_points(False)
     
      #verify that everything was stored
      users = User.objects.filter(email="inception")
      self.assertEquals(len(users), 1)
     
      #note the values here are from the original create call
      self.assertEquals(users[0].stripe_id, '')
      self.assertEquals(users[0].name, 'jj')
     
     
    def test_savepoint_commit(self):
      self.save_points(True)
     
      #verify that everything was stored
      users = User.objects.filter(email="inception")
      self.assertEquals(len(users), 1)
     
      #note the values here are from the update calls
      self.assertEquals(users[0].stripe_id, '4')
      self.assertEquals(users[0].name, 'starting down the rabbit hole')
    

    同样,在我们提交或者回滚保存点之后,我们仍然可以继续在同一个事务中工作。同时,这个运行结果不受之前保存点输出结果的影响。

    例如,如果我们按照如下例子更新“save_points”函数,

     

    @transaction.atomic()
    def save_points(self,save=True):
     
      user = User.create('jj','inception','jj','1234')
      sp1 = transaction.savepoint()
     
      user.name = 'starting down the rabbit hole'
      user.save()
     
      user.stripe_id = 4
      user.save()
     
      if save:
        transaction.savepoint_commit(sp1)
      else:
        transaction.savepoint_rollback(sp1)
     
      user.create('limbo','illbehere@forever','mind blown',
          '1111')
    

    即使无论是“savepoint_commit”或者“savepoint_rollback”被“limbo”这个用户调用了,这个事务仍然会被成功创建。如果没有创建成功,整个事务将会被回滚。

    嵌套事务

    采用“savepoint()”,“savepoint_commit”和“savepoint_rollback”去手动指定保存点,将会自动一个嵌套事务,同时这个嵌套事务会自动为我们创建一个保存点。并且,如果我们遇到错误,这个事务将会回滚。

    下面用一个扩展的例子来说明:
     

    @transaction.atomic()
    def save_points(self,save=True):
     
      user = User.create('jj','inception','jj','1234')
      sp1 = transaction.savepoint()
     
      user.name = 'starting down the rabbit hole'
      user.save()
     
      user.stripe_id = 4
      user.save()
     
      if save:
        transaction.savepoint_commit(sp1)
      else:
        transaction.savepoint_rollback(sp1)
     
      try:
        with transaction.atomic():
          user.create('limbo','illbehere@forever','mind blown',
              '1111')
          if not save: raise DatabaseError
      except DatabaseError:
        pass
    

    这里我们可以看到:在我们处理保存点之后,我们采用“thetransaction.atomic”的上下文管理区擦出我们创建的"limbo"这个用户。当上下文管理被调用的时候,它会创建一个保存点(因为我们已经在事务里面了),同时这个保存点将会依据已经存在的上下文管理器去被执行或者回滚。

    这样下面两个测试用例就描述了这个行文:

     
     

    def test_savepoint_rollbacks(self):
     
         self.save_points(False)
     
        #verify that everything was stored
        users = User.objects.filter(email="inception")
        self.assertEquals(len(users), 1)
     
        #savepoint was rolled back so we should have original values
        self.assertEquals(users[0].stripe_id, '')
        self.assertEquals(users[0].name, 'jj')
     
        #this save point was rolled back because of DatabaseError
        limbo = User.objects.filter(email="illbehere@forever")
        self.assertEquals(len(limbo),0)
     
      def test_savepoint_commit(self):
        self.save_points(True)
     
        #verify that everything was stored
        users = User.objects.filter(email="inception")
        self.assertEquals(len(users), 1)
     
        #savepoint was committed
        self.assertEquals(users[0].stripe_id, '4')
        self.assertEquals(users[0].name, 'starting down the rabbit hole')
     
        #save point was committed by exiting the context_manager without an exception
        limbo = User.objects.filter(email="illbehere@forever")
        self.assertEquals(len(limbo),1)
    


    因此,在现实之中你可以使用原子或者在事务之中创建保存点的保存点。使用原子,你不必要很仔细地担心提交和会滚,当这种情况发生时,你可以完全控制其中的保存点。
    结论

    如果你有任何以往使用Django更早版本事务处理的经验,你可以看到很多更简单地事务处理模型。如下,在默认情况下,也有自动提交功能,它是一个很好的例子,即Django与python两者都引以为豪所提供的“理智的”默认值。对于如此多的系统,你将不需要直接地来处理事务。只是让“自动提交功能”来完成其工作,但如果你这样做,我将希望这个帖子能提供你所需要像专家一样在Django之中管理的事务处理。