debian-pywebpush/pywebpush/__init__.py

492 lines
18 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import base64
from copy import deepcopy
import json
import os
import time
try:
from urllib.parse import urlparse
except ImportError: # pragma nocover
from urlparse import urlparse
import six
import http_ece
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from py_vapid import Vapid, Vapid01
class WebPushException(Exception):
"""Web Push failure.
This may contain the requests.Response
"""
def __init__(self, message, response=None):
self.message = message
self.response = response
def __str__(self):
extra = ""
if self.response:
try:
extra = ", Response {}".format(
self.response.text,
)
except AttributeError:
extra = ", Response {}".format(self.response)
return "WebPushException: {}{}".format(self.message, extra)
class CaseInsensitiveDict(dict):
"""A dictionary that has case-insensitive keys"""
def __init__(self, data={}, **kwargs):
for key in data:
dict.__setitem__(self, key.lower(), data[key])
self.update(kwargs)
def __contains__(self, key):
return dict.__contains__(self, key.lower())
def __setitem__(self, key, value):
dict.__setitem__(self, key.lower(), value)
def __getitem__(self, key):
return dict.__getitem__(self, key.lower())
def __delitem__(self, key):
dict.__delitem__(self, key.lower())
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
def update(self, data):
for key in data:
self.__setitem__(key, data[key])
class WebPusher:
"""WebPusher encrypts a data block using HTTP Encrypted Content Encoding
for WebPush.
See https://tools.ietf.org/html/draft-ietf-webpush-protocol-04
for the current specification, and
https://developer.mozilla.org/en-US/docs/Web/API/Push_API for an
overview of Web Push.
Example of use:
The javascript promise handler for PushManager.subscribe()
receives a subscription_info object. subscription_info.getJSON()
will return a JSON representation.
(e.g.
.. code-block:: javascript
subscription_info.getJSON() ==
{"endpoint": "https://push.server.com/...",
"keys":{"auth": "...", "p256dh": "..."}
}
)
This subscription_info block can be stored.
To send a subscription update:
.. code-block:: python
# Optional
# headers = py_vapid.sign({"aud": "https://push.server.com/",
"sub": "mailto:your_admin@your.site.com"})
data = "Mary had a little lamb, with a nice mint jelly"
WebPusher(subscription_info).send(data, headers)
"""
subscription_info = {}
valid_encodings = [
# "aesgcm128", # this is draft-0, but DO NOT USE.
"aesgcm", # draft-httpbis-encryption-encoding-01
"aes128gcm" # RFC8188 Standard encoding
]
verbose = False
def __init__(self, subscription_info, requests_session=None,
verbose=False):
"""Initialize using the info provided by the client PushSubscription
object (See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe)
:param subscription_info: a dict containing the subscription_info from
the client.
:type subscription_info: dict
:param requests_session: a requests.Session object to optimize requests
to the same client.
:type requests_session: requests.Session
:param verbose: provide verbose feedback
:type verbose: bool
"""
self.verbose = verbose
if requests_session is None:
self.requests_method = requests
else:
self.requests_method = requests_session
if 'endpoint' not in subscription_info:
raise WebPushException("subscription_info missing endpoint URL")
self.subscription_info = deepcopy(subscription_info)
self.auth_key = self.receiver_key = None
if 'keys' in subscription_info:
keys = self.subscription_info['keys']
for k in ['p256dh', 'auth']:
if keys.get(k) is None:
raise WebPushException("Missing keys value: {}".format(k))
if isinstance(keys[k], six.text_type):
keys[k] = bytes(keys[k].encode('utf8'))
receiver_raw = base64.urlsafe_b64decode(
self._repad(keys['p256dh']))
if len(receiver_raw) != 65 and receiver_raw[0] != "\x04":
raise WebPushException("Invalid p256dh key specified")
self.receiver_key = receiver_raw
self.auth_key = base64.urlsafe_b64decode(
self._repad(keys['auth']))
def verb(self, msg, *args, **kwargs):
if self.verbose:
print(msg.format(*args, **kwargs))
def _repad(self, data):
"""Add base64 padding to the end of a string, if required"""
return data + b"===="[:len(data) % 4]
def encode(self, data, content_encoding="aes128gcm"):
"""Encrypt the data.
:param data: A serialized block of byte data (String, JSON, bit array,
etc.) Make sure that whatever you send, your client knows how
to understand it.
:type data: str
:param content_encoding: The content_encoding type to use to encrypt
the data. Defaults to RFC8188 "aes128gcm". The previous draft-01 is
"aesgcm", however this format is now deprecated.
:type content_encoding: enum("aesgcm", "aes128gcm")
"""
# Salt is a random 16 byte array.
if not data:
self.verb("No data found...")
return
if not self.auth_key or not self.receiver_key:
raise WebPushException("No keys specified in subscription info")
self.verb("Encoding data...")
salt = None
if content_encoding not in self.valid_encodings:
raise WebPushException("Invalid content encoding specified. "
"Select from " +
json.dumps(self.valid_encodings))
if content_encoding == "aesgcm":
self.verb("Generating salt for aesgcm...")
salt = os.urandom(16)
# The server key is an ephemeral ECDH key used only for this
# transaction
server_key = ec.generate_private_key(ec.SECP256R1, default_backend())
crypto_key = server_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
if isinstance(data, six.text_type):
data = bytes(data.encode('utf8'))
if content_encoding == "aes128gcm":
self.verb("Encrypting to aes128gcm...")
encrypted = http_ece.encrypt(
data,
salt=salt,
private_key=server_key,
dh=self.receiver_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'body': encrypted
})
else:
self.verb("Encrypting to aesgcm...")
crypto_key = base64.urlsafe_b64encode(crypto_key).strip(b'=')
encrypted = http_ece.encrypt(
data,
salt=salt,
private_key=server_key,
keyid=crypto_key.decode(),
dh=self.receiver_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'crypto_key': crypto_key,
'body': encrypted,
})
if salt:
reply['salt'] = base64.urlsafe_b64encode(salt).strip(b'=')
return reply
def as_curl(self, endpoint, encoded_data, headers):
"""Return the send as a curl command.
Useful for debugging. This will write out the encoded data to a local
file named `encrypted.data`
:param endpoint: Push service endpoint URL
:type endpoint: basestring
:param encoded_data: byte array of encoded data
:type encoded_data: bytearray
:param headers: Additional headers for the send
:type headers: dict
:returns string
"""
header_list = [
'-H "{}: {}" \\ \n'.format(
key.lower(), val) for key, val in headers.items()
]
data = ""
if encoded_data:
with open("encrypted.data", "wb") as f:
f.write(encoded_data)
data = "--data-binary @encrypted.data"
if 'content-length' not in headers:
self.verb("Generating content-length header...")
header_list.append(
'-H "content-length: {}" \\ \n'.format(len(encoded_data)))
return ("""curl -vX POST {url} \\\n{headers}{data}""".format(
url=endpoint, headers="".join(header_list), data=data))
def send(self, data=None, headers=None, ttl=0, gcm_key=None, reg_id=None,
content_encoding="aes128gcm", curl=False, timeout=None):
"""Encode and send the data to the Push Service.
:param data: A serialized block of data (see encode() ).
:type data: str
:param headers: A dictionary containing any additional HTTP headers.
:type headers: dict
:param ttl: The Time To Live in seconds for this message if the
recipient is not online. (Defaults to "0", which discards the
message immediately if the recipient is unavailable.)
:type ttl: int
:param gcm_key: API key obtained from the Google Developer Console.
Needed if endpoint is https://android.googleapis.com/gcm/send
:type gcm_key: string
:param reg_id: registration id of the recipient. If not provided,
it will be extracted from the endpoint.
:type reg_id: str
:param content_encoding: ECE content encoding (defaults to "aes128gcm")
:type content_encoding: str
:param curl: Display output as `curl` command instead of sending
:type curl: bool
:param timeout: POST requests timeout
:type timeout: float or tuple
"""
# Encode the data.
if headers is None:
headers = dict()
encoded = {}
headers = CaseInsensitiveDict(headers)
if data:
encoded = self.encode(data, content_encoding)
if "crypto_key" in encoded:
# Append the p256dh to the end of any existing crypto-key
crypto_key = headers.get("crypto-key", "")
if crypto_key:
# due to some confusion by a push service provider, we
# should use ';' instead of ',' to append the headers.
# see
# https://github.com/webpush-wg/webpush-encryption/issues/6
crypto_key += ';'
crypto_key += (
"dh=" + encoded["crypto_key"].decode('utf8'))
headers.update({
'crypto-key': crypto_key
})
if "salt" in encoded:
headers.update({
'encryption': "salt=" + encoded['salt'].decode('utf8')
})
headers.update({
'content-encoding': content_encoding,
})
if gcm_key:
# guess if it is a legacy GCM project key or actual FCM key
# gcm keys are all about 40 chars (use 100 for confidence),
# fcm keys are 153-175 chars
if len(gcm_key) < 100:
self.verb("Guessing this is legacy GCM...")
endpoint = 'https://android.googleapis.com/gcm/send'
else:
self.verb("Guessing this is FCM...")
endpoint = 'https://fcm.googleapis.com/fcm/send'
reg_ids = []
if not reg_id:
reg_id = self.subscription_info['endpoint'].rsplit('/', 1)[-1]
self.verb("Fetching out registration id: {}", reg_id)
reg_ids.append(reg_id)
gcm_data = dict()
gcm_data['registration_ids'] = reg_ids
if data:
gcm_data['raw_data'] = base64.b64encode(
encoded.get('body')).decode('utf8')
gcm_data['time_to_live'] = int(
headers['ttl'] if 'ttl' in headers else ttl)
encoded_data = json.dumps(gcm_data)
headers.update({
'Authorization': 'key='+gcm_key,
'Content-Type': 'application/json',
})
else:
encoded_data = encoded.get('body')
endpoint = self.subscription_info['endpoint']
if 'ttl' not in headers or ttl:
self.verb("Generating TTL of 0...")
headers['ttl'] = str(ttl or 0)
# Additionally useful headers:
# Authorization / Crypto-Key (VAPID headers)
if curl:
return self.as_curl(endpoint, encoded_data, headers)
self.verb("\nSending request to"
"\n\thost: {}\n\theaders: {}\n\tdata: {}",
endpoint, headers, encoded_data)
resp = self.requests_method.post(endpoint,
data=encoded_data,
headers=headers,
timeout=timeout)
self.verb("\nResponse:\n\tcode: {}\n\tbody: {}\n",
resp.status_code, resp.text or "Empty")
return resp
def webpush(subscription_info,
data=None,
vapid_private_key=None,
vapid_claims=None,
content_encoding="aes128gcm",
curl=False,
timeout=None,
ttl=0,
verbose=False,
headers=None,
requests_session=None):
"""
One call solution to endcode and send `data` to the endpoint
contained in `subscription_info` using optional VAPID auth headers.
in example:
.. code-block:: python
from pywebpush import python
webpush(
subscription_info={
"endpoint": "https://push.example.com/v1/abcd",
"keys": {"p256dh": "0123abcd...",
"auth": "001122..."}
},
data="Mary had a little lamb, with a nice mint jelly",
vapid_private_key="path/to/key.pem",
vapid_claims={"sub": "YourNameHere@example.com"}
)
No additional method call is required. Any non-success will throw a
`WebPushException`.
:param subscription_info: Provided by the client call
:type subscription_info: dict
:param data: Serialized data to send
:type data: str
:param vapid_private_key: Vapid instance or path to vapid private key PEM \
or encoded str
:type vapid_private_key: Union[Vapid, str]
:param vapid_claims: Dictionary of claims ('sub' required)
:type vapid_claims: dict
:param content_encoding: Optional content type string
:type content_encoding: str
:param curl: Return as "curl" string instead of sending
:type curl: bool
:param timeout: POST requests timeout
:type timeout: float or tuple
:param ttl: Time To Live
:type ttl: int
:param verbose: Provide verbose feedback
:type verbose: bool
:return requests.Response or string
:param headers: Dictionary of extra HTTP headers to include
:type headers: dict
"""
if headers is None:
headers = dict()
else:
# Ensure we don't leak VAPID headers by mutating the passed in dict.
headers = headers.copy()
vapid_headers = None
if vapid_claims:
if verbose:
print("Generating VAPID headers...")
if not vapid_claims.get('aud'):
url = urlparse(subscription_info.get('endpoint'))
aud = "{}://{}".format(url.scheme, url.netloc)
vapid_claims['aud'] = aud
# Remember, passed structures are mutable in python.
# It's possible that a previously set `exp` field is no longer valid.
if (not vapid_claims.get('exp')
or vapid_claims.get('exp') < int(time.time())):
# encryption lives for 12 hours
vapid_claims['exp'] = int(time.time()) + (12 * 60 * 60)
if verbose:
print("Setting VAPID expry to {}...".format(
vapid_claims['exp']))
if not vapid_private_key:
raise WebPushException("VAPID dict missing 'private_key'")
if isinstance(vapid_private_key, Vapid01):
vv = vapid_private_key
elif os.path.isfile(vapid_private_key):
# Presume that key from file is handled correctly by
# py_vapid.
vv = Vapid.from_file(
private_key_file=vapid_private_key) # pragma no cover
else:
vv = Vapid.from_string(private_key=vapid_private_key)
if verbose:
print("\t claims: {}".format(vapid_claims))
vapid_headers = vv.sign(vapid_claims)
if verbose:
print("\t headers: {}".format(vapid_headers))
headers.update(vapid_headers)
response = WebPusher(
subscription_info, requests_session=requests_session, verbose=verbose
).send(
data,
headers,
ttl=ttl,
content_encoding=content_encoding,
curl=curl,
timeout=timeout,
)
if not curl and response.status_code > 202:
raise WebPushException("Push failed: {} {}\nResponse body:{}".format(
response.status_code, response.reason, response.text),
response=response)
return response