为每个用户分配各个数据表的读、写、删除权限,其实不单单是企业开发需要,一般系统同样需要。但是在企业应用中我们还需要根据,即监督那些白名单中的用户行为。比如,有删除权限的用户,删除了某条销售记录,但是我想知道是谁干的...
下面,我就谈谈目前我采取的方法,
1/方法一 txt格式的log文件
记录格式
2011-07-21 15:54:22,020 INFO views.customer_add Line:116>>User,ghiewa -Action,Add -Table,vendor_customer
2011-07-21 15:54:22,020 INFO views.customer_add Line:116>>User,ghiewa -Action,Add -Table,vendor_customer
首先配置log,
若使用是Django1.3之前的版本需要配置 (在setting文件末尾添加)
root = logging.getLogger()
if len(root.handlers) == 0: pass #avoid conflict
level = logging.INFO
filename = os.path.join(PROJECT_ROOT,'log.log')
format = '%(asctime)s %(levelname)s %(module)s.%(funcName)s Line:%(lineno)d%(message)s'
hdlr = handlers.TimedRotatingFileHandler(filename,"midnight",1,90)
fmt = logging.Formatter(format)
hdlr.setFormatter(fmt)
root.addHandler(hdlr)
root.setLevel(level)
然后,
在views.py中
import logging
在具体model 操作语句后添加
logging.info('>>User,%s -Action,%s -Table,%s' %(request.user.username,'Add','vendor_contact'))
--附--
若是1.3版本
import logging
# Get an instance of a logger
logger = logging.getLogger(__name__)
def my_view(request, arg1, arg):
...
if bad_mojo:
# Log an error message
logger.error('Something went wrong!')
2/ 方法二、 使用simple_history APP (github.com上获取代码)
将改APP放置到python可视的目录下,比如我放置在当前工程的ext文件夹下
在Model定义文件中
from ext.simple_history.models import HistoricalRecords
class Test(models.Model):
...
history = HistorialRecords()
如此即可
取点没有记录是谁 另会多出许多HistoryalXXX表
方法3,编写自己的代码 利用Django Admin的数据表
#core/history.py
from django.contrib.auth.models import User
from middleware import threadlocals
from django.contrib.contenttypes.models import ContentType
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import CHANGE
from django.contrib.admin.models import LogEntry
from django.db import models
from django.contrib.admin.models import DELETION
from django.db.models import signals
from django.db.models.base import ModelBase
import datetime
def prepare_fields(instance):
output = {}
class_fields = instance.__class__._meta.fields
instance._history_fields = {}
all = dict([(f.name, f) for f in class_fields])
for field_name in instance._history['fields']:
modelfield = all[field_name]
value = getattr(instance, modelfield.attname)
if value is None: value = ''
output[field_name] = unicode(value)
return output
def add_signals(cls):
def post_delete(instance, **_kwargs):
if instance._history.get('model', False):
instance._create_log_entry(DELETION)
def pre_save(instance, **_kwargs):
if instance._history.get('fields', []):
if instance.pk is None:
instance._history_fields = {}
for field_name in instance._history['fields']:
instance._history_fields[field_name] = ''
else:
try:
db_instance = instance.__class__.objects.get(pk=instance.pk)
except instance.__class__.DoesNotExist:
db_instance = instance
instance._history_fields = prepare_fields(db_instance)
if instance._history.get('model', False):
if instance.pk == None:
instance._history_action = ADDITION
else:
instance._history_action = CHANGE
def post_save(instance, **_kwargs):
if instance._history.get('fields', []):
pre_fields = instance._history_fields
post_fields = prepare_fields(instance)
for name, after in post_fields.iteritems():
#print 'looking if', name, 'changed...'
before = pre_fields[name]
if before != after:
# field has been changed
#print 'changed', name, 'from', before, 'to', after
instance._create_field_log_entry(name, after)
#print 'checking done.'
if instance._history.get('model', False):
instance._create_log_entry(instance._history_action)
signals.pre_save.connect(pre_save, sender=cls, weak=False)
signals.post_save.connect(post_save, sender=cls, weak=False)
signals.post_delete.connect(post_delete, sender=cls, weak=False)
class ModelWithHistoryBase(ModelBase):
def __new__(cls, name, bases, attrs):
Model = ModelBase.__new__(cls, name, bases, attrs)
history = getattr(Model, 'History', None)
if history:
history = history.__dict__
if not 'model' in history:
history['model'] = False
if not 'fields' in history:
history['fields'] = []
else:
#raise "Please add History subclass to your model"
history = {
'model': False,
'fields': []
}
Model._history = history
add_signals(Model)
return Model
class ModelWithHistory(models.Model):
__metaclass__ = ModelWithHistoryBase
class Meta:
abstract = True
def _create_log_entry(self, action):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
history = LogEntry(user=user, object_id = self.pk, action_flag = action,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def _create_field_log_entry(self, name, value):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
from core.models import AttributeLogEntry
history = AttributeLogEntry(user=user, object_id = self.pk, field_name=name, field_value = value,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def get_history(self):
content_type = ContentType.objects.get_for_model(self)
return LogEntry.objects.filter(object_id=self.pk, content_type=content_type)
def has_history(self):
return bool(self.__class__._history.get('model', False))
def last_edited_at(self):
history = list(self.get_history()[:1])
if not history:
return datetime.datetime(2000, 1, 1, 0, 0, 0)
else:
return history[0].action_time
def last_edited_by(self):
history = list(self.get_history()[:1])
if not history:
return User.objects.get(pk=1)
else:
return history[0].user
#core/models.py
from django.db import models
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.utils.safestring import mark_safe
from django.contrib.admin.util import quote
from django.utils.encoding import smart_unicode
from django.utils.translation import ugettext_lazy as _
from django.utils.datetime_safe import strftime
import datetime
#almost dupe of LogEntry model
class AttributeLogEntry(models.Model):
class Meta:
verbose_name = _('attribute log entry')
verbose_name_plural = _('attribute log entries')
db_table = 'django_attribute_log'
ordering = ('-action_time',)
action_time = models.DateTimeField(_('action time'), auto_now=True)
user = models.ForeignKey(User)
content_type = models.ForeignKey(ContentType, blank=True, null=True)
object_id = models.IntegerField(_('object id'), blank=True, null=True)
field_name = models.CharField(_('field name'), max_length=200, blank=True, null=True)
field_value = models.TextField(_('field value'), null=True, blank=True)
def __repr__(self):
return smart_unicode(self.action_time)
@staticmethod
def get_history(obj, field):
content_type = ContentType.objects.get_for_model(obj)
return AttributeLogEntry.objects.filter(object_id=obj.pk, field_name=field, content_type=content_type)
def get_edited_object(self):
"Returns the edited object represented by this log entry"
return self.content_type.get_object_for_this_type(pk=self.object_id)
def get_admin_url(self):
"""
Returns the admin URL to edit the object represented by this log entry.
This is relative to the Django admin index page.
"""
return mark_safe(u"%s/%s/%s/" % (self.content_type.app_label, self.content_type.model, quote(self.object_id)))
@staticmethod
def last_edited_at(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return strftime(history[0].action_time, "%Y-%m-%d %H:%M")
@staticmethod
def last_edited_by(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return history[0].user
使用方法
class MyModel(ModelWithHistory):
class History:
model = True # save model changes into admin's LogEntry table
fields = ('f1', 'f2') # save these fields history to AttributeLogEntry table
f1 = CharField(max_length=100)
f2 = IntegerField()
总结,
比较后,你会发现方法二和方法三是用了类似的思路即自动记录,并讲结果记录在Database中。但是显然方法二的代码工作量更少,与本身系统的耦合度非常的低,是推荐方法。但是缺陷不能告诉我们是谁的动作
2011.08.28 15:43修改