This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-msp/src/authentic2_auth_msp/models.py

78 lines
2.7 KiB
Python

import requests
import json
import logging
import urlparse
from requests_oauthlib import OAuth2Session
from django.db import models, transaction
from django.db.models.query import Q
from django.contrib.auth import get_user_model
from django.utils.translation import ugettext_lazy as _
from . import app_settings
class MspAccountManager(models.Manager):
def cleanup(self):
logger = logging.getLogger(__name__)
for msp_account in self.filter(Q(user__isnull=True)
|Q(user__deleteduser__isnull=False)):
try:
with transaction.commit_on_success():
if msp_account.refresh_token():
if msp_account.token:
self.api_call('app/rest/agc', method='delete')
msp_account.delete()
except:
logger.exception('unable to delete msp account %s', msp_account)
class MspAccount(models.Model):
user = models.OneToOneField(get_user_model(),
verbose_name=_('user'),
default=None,
null=True,
on_delete=models.SET_NULL)
agc = models.CharField(max_length=64, verbose_name=_('access grant code'))
token = models.TextField(verbose_name=_('access token'))
objects = MspAccountManager()
def api_call(self, api_path, method='get', **kwargs):
url = urlparse.urljoin(app_settings.api_url, api_path)
session = OAuth2Session(app_settings.client_id,
token=self.token)
return getattr(session, method)(url,
verify=app_settings.verify_certificate,
cert=app_settings.client_certificate, **kwargs)
def refresh_token(self):
logger = logging.getLogger(__name__)
if not self.token:
return True
token = json.loads(self.token)
data = {
'grant_type': 'refresh_token',
'refresh_token': token['refresh_token'],
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
}
response = requests.post(app_settings.token_url,
data=data, verify=app_settings.verify_certificate,
cert=app_settings.client_certificate)
new_token = response.json()
if 'error' in new_token:
if new_token['error'] == 'invalid_grant':
logger.warning('obsolete token %r, deleting MspAccount %r', self.token,
self.agc)
self.delete()
return False
return True
else:
self.token = json.dumps(new_token)
self.save()
return True