Merge branch 'wip/rebase-on-1.14.0' into wip/merge-1.14.0

This commit is contained in:
Benjamin Dauvergne 2024-01-24 16:17:31 +01:00
commit fa3bff4f50
13 changed files with 536 additions and 262 deletions

87
.circleci/config.yml Normal file
View File

@ -0,0 +1,87 @@
# These environment variables must be set in CircleCI UI
#
# DOCKERHUB_REPO - docker hub repo, format: <username>/<repo>
# DOCKER_USER - login info for docker hub
# DOCKER_PASS
#
version: 2
jobs:
build:
docker:
- image: docker:18.02.0-ce
working_directory: /dockerflow
steps:
# workaround circleci's fallback git's "object not found" error w/ tag
# builds
- run:
name: Install Docker build dependencies
command: apk add --no-cache openssh-client git
- checkout
- setup_remote_docker
- run:
name: Build Docker image
command: docker build -t app:build .
# save the built docker container into CircleCI's cache. This is
# required since Workflows do not have the same remote docker instance.
- run:
name: docker save app:build
command: mkdir -p /cache; docker save -o /cache/docker.tar "app:build"
- save_cache:
key: v1-{{ .Branch }}-{{ .Environment.CIRCLE_TAG }}-{{ epoch }}
paths:
- /cache/docker.tar
deploy:
docker:
- image: docker:18.02.0-ce
steps:
- setup_remote_docker
- restore_cache:
key: v1-{{.Branch}}
- run:
name: Restore Docker image cache
command: docker load -i /cache/docker.tar
- run:
name: Deploy to Dockerhub
command: |
if [ "${CIRCLE_BRANCH}" == "main" ]; then
DOCKER_TAG="latest"
fi
if echo "${CIRCLE_BRANCH}" | grep '^feature\..*' > /dev/null; then
DOCKER_TAG="${CIRCLE_BRANCH}"
fi
if [ -n "${CIRCLE_TAG}" ]; then
DOCKER_TAG="$CIRCLE_TAG"
fi
if [ -n "${DOCKER_TAG}" ]; then
echo "$DOCKER_PASS" | docker login -u "$DOCKER_USER" --password-stdin
echo ${DOCKERHUB_REPO}:${DOCKER_TAG}
docker tag app:build ${DOCKERHUB_REPO}:${DOCKER_TAG}
docker images
docker push "${DOCKERHUB_REPO}:${DOCKER_TAG}"
else
echo "Not pushing to dockerhub for tag=${CIRCLE_TAG} branch=${CIRCLE_BRANCH}"
fi
workflows:
version: 2
build-deploy:
jobs:
- build:
filters:
tags:
only: /.*/
- deploy:
requires:
- build
filters:
tags:
only: /.*/

View File

@ -1,14 +0,0 @@
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
install:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
script:
- nosetests
- flake8 pywebpush
after_success:
- codecov

View File

@ -1,3 +1,30 @@
# I am terrible at keeping this up-to-date.
## 1.14.0 (2021-07-28)
bug: accept all VAPID key instances (thanks @mthu)
## 1.13.0 (2021-03-15)
Support requests_session param in webpush fn too
## 1.12.0 (2021-03-15)
chore: library update, remove nose tests
## 1.11.0 (2020-04-29)
feat: add `--head` to read headers out of a json file (thanks @braedon)
## 1.10.2 (2020-04-11)
bug: update min vapid requirement to 1.7.0
## 1.10.1 (2019-12-03)
feat: use six.text_type instead of six.string_types
## 1.10.0 (2019-08-13)
feat: Add `--verbose` flag with some initial commentary
bug: Update tests to use latest VAPID version
## 1.9.4 (2019-05-09)
bug: update vapid `exp` header if missing or expired
## 0.7.0 (2017-02-14)
feat: update to http-ece 0.7.0 (with draft-06 support)
feat: Allow empty payloads for send()

15
CODE_OF_CONDUCT.md Normal file
View File

@ -0,0 +1,15 @@
# Community Participation Guidelines
This repository is governed by Mozilla's code of conduct and etiquette guidelines.
For more details, please read the
[Mozilla Community Participation Guidelines](https://www.mozilla.org/about/governance/policies/participation/).
## How to Report
For more information on how to report violations of the Community Participation Guidelines, please read our '[How to Report](https://www.mozilla.org/about/governance/policies/participation/reporting/)' page.
<!--
## Project Specific Etiquette
In some cases, there will be additional project etiquette i.e.: (https://bugzilla.mozilla.org/page.cgi?id=etiquette.html).
Please update for your project.
-->

View File

@ -1,7 +1,7 @@
[![Build
Status](https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master)](https://travis-ci.org/web-push-libs/pywebpush)
Status](https://travis-ci.org/web-push-libs/pywebpush.svg?branch=main)](https://travis-ci.org/web-push-libs/pywebpush)
[![Requirements
Status](https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=master)](https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=master)
Status](https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=main)](https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=main)
# Webpush Data encryption library for Python
@ -32,7 +32,10 @@ and push data.
As illustration, a `subscription_info` object may look like:
```json
{"endpoint": "https://updates.push.services.mozilla.com/push/v1/gAA...", "keys": {"auth": "k8J...", "p256dh": "BOr..."}}
{
"endpoint": "https://updates.push.services.mozilla.com/push/v1/gAA...",
"keys": { "auth": "k8J...", "p256dh": "BOr..." }
}
```
How you send the PushSubscription data to your backend, store it
@ -60,19 +63,19 @@ in the `subscription_info` block.
**Parameters**
*subscription_info* - The `dict` of the subscription info (described above).
_subscription_info_ - The `dict` of the subscription info (described above).
*data* - can be any serial content (string, bit array, serialized JSON, etc), but be sure that your receiving
_data_ - can be any serial content (string, bit array, serialized JSON, etc), but be sure that your receiving
application is able to parse and understand it. (e.g. `data = "Mary had a little lamb."`)
*content_type* - specifies the form of Encryption to use, either `'aesgcm'` or the newer `'aes128gcm'`. NOTE that
not all User Agents can decrypt `'aes128gcm'`, so the library defaults to the older form.
_content_type_ - specifies the form of Encryption to use, either `'aes128gcm'` or the deprecated `'aesgcm'`. NOTE that
not all User Agents can decrypt `'aesgcm'`, so the library defaults to the RFC 8188 standard form.
*vapid_claims* - a `dict` containing the VAPID claims required for authorization (See
_vapid_claims_ - a `dict` containing the VAPID claims required for authorization (See
[py_vapid](https://github.com/web-push-libs/vapid/tree/master/python) for more details). If `aud` is not specified,
pywebpush will attempt to auto-fill from the `endpoint`.
*vapid_private_key* - Either a path to a VAPID EC2 private key PEM file, or a string containing the DER representation.
_vapid_private_key_ - Either a path to a VAPID EC2 private key PEM file, or a string containing the DER representation.
(See [py_vapid](https://github.com/web-push-libs/vapid/tree/master/python) for more details.) The `private_key` may be
a base64 encoded DER formatted private key, or the path to an OpenSSL exported private key file.
@ -120,29 +123,29 @@ can pass just `wp = WebPusher(subscription_info)`. This will return a `WebPusher
The following methods are available:
#### `.send(data, headers={}, ttl=0, gcm_key="", reg_id="", content_encoding="aesgcm", curl=False, timeout=None)`
#### `.send(data, headers={}, ttl=0, gcm_key="", reg_id="", content_encoding="aes128gcm", curl=False, timeout=None)`
Send the data using additional parameters. On error, returns a `WebPushException`
**Parameters**
*data* Binary string of data to send
_data_ Binary string of data to send
*headers* A `dict` containing any additional headers to send
_headers_ A `dict` containing any additional headers to send
*ttl* Message Time To Live on Push Server waiting for the client to reconnect (in seconds)
_ttl_ Message Time To Live on Push Server waiting for the client to reconnect (in seconds)
*gcm_key* Google Cloud Messaging key (if using the older GCM push system) This is the API key obtained from the Google
_gcm_key_ Google Cloud Messaging key (if using the older GCM push system) This is the API key obtained from the Google
Developer Console.
*reg_id* Google Cloud Messaging registration ID (will be extracted from endpoint if not specified)
_reg_id_ Google Cloud Messaging registration ID (will be extracted from endpoint if not specified)
*content_encoding* ECE content encoding type (defaults to "aesgcm")
_content_encoding_ ECE content encoding type (defaults to "aes128gcm")
*curl* Do not execute the POST, but return as a `curl` command. This will write the encrypted content to a local file
_curl_ Do not execute the POST, but return as a `curl` command. This will write the encrypted content to a local file
named `encrpypted.data`. This command is meant to be used for debugging purposes.
*timeout* timeout for requests POST query.
_timeout_ timeout for requests POST query.
See [requests documentation](http://docs.python-requests.org/en/master/user/quickstart/#timeouts).
**Example**
@ -153,15 +156,15 @@ to send from Chrome using the old GCM mode:
WebPusher(subscription_info).send(data, headers, ttl, gcm_key)
```
#### `.encode(data, content_encoding="aesgcm")`
#### `.encode(data, content_encoding="aes128gcm")`
Encode the `data` for future use. On error, returns a `WebPushException`
**Parameters**
*data* Binary string of data to send
_data_ Binary string of data to send
*content_encoding* ECE content encoding type (defaults to "aesgcm")
_content_encoding_ ECE content encoding type (defaults to "aes128gcm")
**Example**
@ -169,4 +172,32 @@ Encode the `data` for future use. On error, returns a `WebPushException`
encoded_data = WebPush(subscription_info).encode(data)
```
## Stand Alone Webpush
If you're not really into coding your own solution, there's also a "stand-alone" `pywebpush` command in the
./bin directory.
This uses two files:
- the _data_ file, which contains the message to send, in whatever form you like.
- the _subscription info_ file, which contains the subscription information as JSON encoded data. This is usually returned by the Push `subscribe` method and looks something like:
```json
{
"endpoint": "https://push...",
"keys": {
"auth": "ab01...",
"p256dh": "aa02..."
}
}
```
If you're interested in just testing your applications WebPush interface, you could use the Command Line:
```bash
./bin/pywebpush --data stuff_to_send.data --info subscription.info
```
which will encrypt and send the contents of `stuff_to_send.data`.
See `./bin/pywebpush --help` for available commands and options.

View File

@ -10,12 +10,12 @@ available on `github <https://github.com/mozilla-services/pywebpush>`__.
Installation
------------
You'll need to run ``python virtualenv``. Then
Youll need to run ``python virtualenv``. Then
::
bin/pip install -r requirements.txt
bin/python setup.py develop
bin/pip install -r requirements.txt
bin/python setup.py develop
Usage
-----
@ -31,26 +31,26 @@ As illustration, a ``subscription_info`` object may look like:
.. code:: json
{"endpoint": "https://updates.push.services.mozilla.com/push/v1/gAA...", "keys": {"auth": "k8J...", "p256dh": "BOr..."}}
{"endpoint": "https://updates.push.services.mozilla.com/push/v1/gAA...", "keys": {"auth": "k8J...", "p256dh": "BOr..."}}
How you send the PushSubscription data to your backend, store it
referenced to the user who requested it, and recall it when there's a
referenced to the user who requested it, and recall it when theres a
new push subscription update is left as an exercise for the reader.
Sending Data using ``webpush()`` One Call
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In many cases, your code will be sending a single message to many
recipients. There's a "One Call" function which will make things easier.
recipients. Theres a “One Call” function which will make things easier.
.. code:: python
from pywebpush import webpush
from pywebpush import webpush
webpush(subscription_info,
data,
vapid_private_key="Private Key or File Path[1]",
vapid_claims={"sub": "mailto:YourEmailAddress"})
webpush(subscription_info,
data,
vapid_private_key="Private Key or File Path[1]",
vapid_claims={"sub": "mailto:YourEmailAddress"})
This will encode ``data``, add the appropriate VAPID auth headers if
required and send it to the push server identified in the
@ -58,66 +58,67 @@ required and send it to the push server identified in the
**Parameters**
*subscription\_info* - The ``dict`` of the subscription info (described
*subscription_info* - The ``dict`` of the subscription info (described
above).
*data* - can be any serial content (string, bit array, serialized JSON,
etc), but be sure that your receiving application is able to parse and
understand it. (e.g. ``data = "Mary had a little lamb."``)
understand it. (e.g. ``data = "Mary had a little lamb."``)
*content\_type* - specifies the form of Encryption to use, either
``'aesgcm'`` or the newer ``'aes128gcm'``. NOTE that not all User Agents
can decrypt ``'aes128gcm'``, so the library defaults to the older form.
*content_type* - specifies the form of Encryption to use, either
``'aes128gcm'`` or the deprecated ``'aesgcm'``. NOTE that not all User
Agents can decrypt ``'aesgcm'``, so the library defaults to the RFC 8188
standard form.
*vapid\_claims* - a ``dict`` containing the VAPID claims required for
*vapid_claims* - a ``dict`` containing the VAPID claims required for
authorization (See
`py\_vapid <https://github.com/web-push-libs/vapid/tree/master/python>`__
`py_vapid <https://github.com/web-push-libs/vapid/tree/master/python>`__
for more details). If ``aud`` is not specified, pywebpush will attempt
to auto-fill from the ``endpoint``.
*vapid\_private\_key* - Either a path to a VAPID EC2 private key PEM
file, or a string containing the DER representation. (See
`py\_vapid <https://github.com/web-push-libs/vapid/tree/master/python>`__
*vapid_private_key* - Either a path to a VAPID EC2 private key PEM file,
or a string containing the DER representation. (See
`py_vapid <https://github.com/web-push-libs/vapid/tree/master/python>`__
for more details.) The ``private_key`` may be a base64 encoded DER
formatted private key, or the path to an OpenSSL exported private key
file.
e.g. the output of:
e.g. the output of:
::
openssl ecparam -name prime256v1 -genkey -noout -out private_key.pem
openssl ecparam -name prime256v1 -genkey -noout -out private_key.pem
**Example**
.. code:: python
from pywebpush import webpush, WebPushException
from pywebpush import webpush, WebPushException
try:
webpush(
subscription_info={
"endpoint": "https://push.example.com/v1/12345",
"keys": {
"p256dh": "0123abcde...",
"auth": "abc123..."
}},
data="Mary had a little lamb, with a nice mint jelly",
vapid_private_key="path/to/vapid_private.pem",
vapid_claims={
"sub": "mailto:YourNameHere@example.org",
}
)
except WebPushException as ex:
print("I'm sorry, Dave, but I can't do that: {}", repr(ex))
# Mozilla returns additional information in the body of the response.
if ex.response and ex.response.json():
extra = ex.response.json()
print("Remote service replied with a {}:{}, {}",
extra.code,
extra.errno,
extra.message
)
try:
webpush(
subscription_info={
"endpoint": "https://push.example.com/v1/12345",
"keys": {
"p256dh": "0123abcde...",
"auth": "abc123..."
}},
data="Mary had a little lamb, with a nice mint jelly",
vapid_private_key="path/to/vapid_private.pem",
vapid_claims={
"sub": "mailto:YourNameHere@example.org",
}
)
except WebPushException as ex:
print("I'm sorry, Dave, but I can't do that: {}", repr(ex))
# Mozilla returns additional information in the body of the response.
if ex.response and ex.response.json():
extra = ex.response.json()
print("Remote service replied with a {}:{}, {}",
extra.code,
extra.errno,
extra.message
)
Methods
~~~~~~~
@ -129,8 +130,8 @@ object.
The following methods are available:
``.send(data, headers={}, ttl=0, gcm_key="", reg_id="", content_encoding="aesgcm", curl=False, timeout=None)``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``.send(data, headers={}, ttl=0, gcm_key="", reg_id="", content_encoding="aes128gcm", curl=False, timeout=None)``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Send the data using additional parameters. On error, returns a
``WebPushException``
@ -144,13 +145,13 @@ Send the data using additional parameters. On error, returns a
*ttl* Message Time To Live on Push Server waiting for the client to
reconnect (in seconds)
*gcm\_key* Google Cloud Messaging key (if using the older GCM push
*gcm_key* Google Cloud Messaging key (if using the older GCM push
system) This is the API key obtained from the Google Developer Console.
*reg\_id* Google Cloud Messaging registration ID (will be extracted from
*reg_id* Google Cloud Messaging registration ID (will be extracted from
endpoint if not specified)
*content\_encoding* ECE content encoding type (defaults to "aesgcm")
*content_encoding* ECE content encoding type (defaults to “aes128gcm”)
*curl* Do not execute the POST, but return as a ``curl`` command. This
will write the encrypted content to a local file named
@ -166,10 +167,10 @@ to send from Chrome using the old GCM mode:
.. code:: python
WebPusher(subscription_info).send(data, headers, ttl, gcm_key)
WebPusher(subscription_info).send(data, headers, ttl, gcm_key)
``.encode(data, content_encoding="aesgcm")``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``.encode(data, content_encoding="aes128gcm")``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Encode the ``data`` for future use. On error, returns a
``WebPushException``
@ -178,15 +179,46 @@ Encode the ``data`` for future use. On error, returns a
*data* Binary string of data to send
*content\_encoding* ECE content encoding type (defaults to "aesgcm")
*content_encoding* ECE content encoding type (defaults to “aes128gcm”)
**Example**
.. code:: python
encoded_data = WebPush(subscription_info).encode(data)
encoded_data = WebPush(subscription_info).encode(data)
.. |Build Status| image:: https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master
Stand Alone Webpush
-------------------
If youre not really into coding your own solution, theres also a
“stand-alone” ``pywebpush`` command in the ./bin directory.
This uses two files: \* the *data* file, which contains the message to
send, in whatever form you like. \* the *subscription info* file, which
contains the subscription information as JSON encoded data. This is
usually returned by the Push ``subscribe`` method and looks something
like:
.. code:: json
{"endpoint": "https://push...",
"keys": {
"auth": "ab01...",
"p256dh": "aa02..."
}}
If youre interested in just testing your applications WebPush
interface, you could use the Command Line:
.. code:: bash
./bin/pywebpush --data stuff_to_send.data --info subscription.info
which will encrypt and send the contents of ``stuff_to_send.data``.
See ``./bin/pywebpush --help`` for available commands and options.
.. |Build Status| image:: https://travis-ci.org/web-push-libs/pywebpush.svg?branch=main
:target: https://travis-ci.org/web-push-libs/pywebpush
.. |Requirements Status| image:: https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=master
:target: https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=master
.. |Requirements Status| image:: https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=main
:target: https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=main

View File

@ -1,12 +0,0 @@
# circle ci file
machine:
post:
- pyenv global 2.7.13 3.5
dependencies:
pre:
- pip install -r test-requirements.txt
test:
override:
- nosetests -v pywebpush

View File

@ -6,6 +6,7 @@ import base64
from copy import deepcopy
import json
import os
import time
try:
from urllib.parse import urlparse
@ -17,7 +18,8 @@ import http_ece
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from py_vapid import Vapid
from cryptography.hazmat.primitives import serialization
from py_vapid import Vapid, Vapid01
class WebPushException(Exception):
@ -112,10 +114,12 @@ class WebPusher:
valid_encodings = [
# "aesgcm128", # this is draft-0, but DO NOT USE.
"aesgcm", # draft-httpbis-encryption-encoding-01
"aes128gcm" # draft-httpbis-encryption-encoding-04
"aes128gcm" # RFC8188 Standard encoding
]
verbose = False
def __init__(self, subscription_info, requests_session=None):
def __init__(self, subscription_info, requests_session=None,
verbose=False):
"""Initialize using the info provided by the client PushSubscription
object (See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe)
@ -128,7 +132,12 @@ class WebPusher:
to the same client.
:type requests_session: requests.Session
:param verbose: provide verbose feedback
:type verbose: bool
"""
self.verbose = verbose
if requests_session is None:
self.requests_method = requests
else:
@ -143,7 +152,7 @@ class WebPusher:
for k in ['p256dh', 'auth']:
if keys.get(k) is None:
raise WebPushException("Missing keys value: {}".format(k))
if isinstance(keys[k], six.string_types):
if isinstance(keys[k], six.text_type):
keys[k] = bytes(keys[k].encode('utf8'))
receiver_raw = base64.urlsafe_b64decode(
self._repad(keys['p256dh']))
@ -153,11 +162,15 @@ class WebPusher:
self.auth_key = base64.urlsafe_b64decode(
self._repad(keys['auth']))
def verb(self, msg, *args, **kwargs):
if self.verbose:
print(msg.format(*args, **kwargs))
def _repad(self, data):
"""Add base64 padding to the end of a string, if required"""
return data + b"===="[:len(data) % 4]
def encode(self, data, content_encoding="aesgcm"):
def encode(self, data, content_encoding="aes128gcm"):
"""Encrypt the data.
:param data: A serialized block of byte data (String, JSON, bit array,
@ -165,49 +178,65 @@ class WebPusher:
to understand it.
:type data: str
:param content_encoding: The content_encoding type to use to encrypt
the data. Defaults to draft-01 "aesgcm". Latest draft-04 is
"aes128gcm", however not all clients may be able to use this
format.
the data. Defaults to RFC8188 "aes128gcm". The previous draft-01 is
"aesgcm", however this format is now deprecated.
:type content_encoding: enum("aesgcm", "aes128gcm")
"""
# Salt is a random 16 byte array.
if not data:
self.verb("No data found...")
return
if not self.auth_key or not self.receiver_key:
raise WebPushException("No keys specified in subscription info")
self.verb("Encoding data...")
salt = None
if content_encoding not in self.valid_encodings:
raise WebPushException("Invalid content encoding specified. "
"Select from " +
json.dumps(self.valid_encodings))
if (content_encoding == "aesgcm"):
if content_encoding == "aesgcm":
self.verb("Generating salt for aesgcm...")
salt = os.urandom(16)
# The server key is an ephemeral ECDH key used only for this
# transaction
server_key = ec.generate_private_key(ec.SECP256R1, default_backend())
crypto_key = base64.urlsafe_b64encode(
server_key.public_key().public_numbers().encode_point()
).strip(b'=')
crypto_key = server_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
if isinstance(data, six.string_types):
if isinstance(data, six.text_type):
data = bytes(data.encode('utf8'))
encrypted = http_ece.encrypt(
data,
salt=salt,
keyid=crypto_key.decode(),
private_key=server_key,
dh=self.receiver_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'crypto_key': crypto_key,
'body': encrypted,
})
if salt:
reply['salt'] = base64.urlsafe_b64encode(salt).strip(b'=')
if content_encoding == "aes128gcm":
self.verb("Encrypting to aes128gcm...")
encrypted = http_ece.encrypt(
data,
salt=salt,
private_key=server_key,
dh=self.receiver_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'body': encrypted
})
else:
self.verb("Encrypting to aesgcm...")
crypto_key = base64.urlsafe_b64encode(crypto_key).strip(b'=')
encrypted = http_ece.encrypt(
data,
salt=salt,
private_key=server_key,
keyid=crypto_key.decode(),
dh=self.receiver_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'crypto_key': crypto_key,
'body': encrypted,
})
if salt:
reply['salt'] = base64.urlsafe_b64encode(salt).strip(b'=')
return reply
def as_curl(self, endpoint, encoded_data, headers):
@ -235,13 +264,14 @@ class WebPusher:
f.write(encoded_data)
data = "--data-binary @encrypted.data"
if 'content-length' not in headers:
self.verb("Generating content-length header...")
header_list.append(
'-H "content-length: {}" \\ \n'.format(len(data)))
'-H "content-length: {}" \\ \n'.format(len(encoded_data)))
return ("""curl -vX POST {url} \\\n{headers}{data}""".format(
url=endpoint, headers="".join(header_list), data=data))
def send(self, data=None, headers=None, ttl=0, gcm_key=None, reg_id=None,
content_encoding="aesgcm", curl=False, timeout=None):
content_encoding="aes128gcm", curl=False, timeout=None):
"""Encode and send the data to the Push Service.
:param data: A serialized block of data (see encode() ).
@ -258,7 +288,7 @@ class WebPusher:
:param reg_id: registration id of the recipient. If not provided,
it will be extracted from the endpoint.
:type reg_id: str
:param content_encoding: ECE content encoding (defaults to "aesgcm")
:param content_encoding: ECE content encoding (defaults to "aes128gcm")
:type content_encoding: str
:param curl: Display output as `curl` command instead of sending
:type curl: bool
@ -272,25 +302,42 @@ class WebPusher:
encoded = {}
headers = CaseInsensitiveDict(headers)
if data:
encoded = self.encode(data)
# Append the p256dh to the end of any existing crypto-key
crypto_key = headers.get("crypto-key", "")
if crypto_key:
# due to some confusion by a push service provider, we should
# use ';' instead of ',' to append the headers.
# see https://github.com/webpush-wg/webpush-encryption/issues/6
crypto_key += ';'
crypto_key += ("dh=" + encoded["crypto_key"].decode('utf8'))
encoded = self.encode(data, content_encoding)
if "crypto_key" in encoded:
# Append the p256dh to the end of any existing crypto-key
crypto_key = headers.get("crypto-key", "")
if crypto_key:
# due to some confusion by a push service provider, we
# should use ';' instead of ',' to append the headers.
# see
# https://github.com/webpush-wg/webpush-encryption/issues/6
crypto_key += ';'
crypto_key += (
"dh=" + encoded["crypto_key"].decode('utf8'))
headers.update({
'crypto-key': crypto_key
})
if "salt" in encoded:
headers.update({
'encryption': "salt=" + encoded['salt'].decode('utf8')
})
headers.update({
'crypto-key': crypto_key,
'content-encoding': content_encoding,
'encryption': "salt=" + encoded['salt'].decode('utf8'),
})
if gcm_key:
endpoint = 'https://android.googleapis.com/gcm/send'
# guess if it is a legacy GCM project key or actual FCM key
# gcm keys are all about 40 chars (use 100 for confidence),
# fcm keys are 153-175 chars
if len(gcm_key) < 100:
self.verb("Guessing this is legacy GCM...")
endpoint = 'https://android.googleapis.com/gcm/send'
else:
self.verb("Guessing this is FCM...")
endpoint = 'https://fcm.googleapis.com/fcm/send'
reg_ids = []
if not reg_id:
reg_id = self.subscription_info['endpoint'].rsplit('/', 1)[-1]
self.verb("Fetching out registration id: {}", reg_id)
reg_ids.append(reg_id)
gcm_data = dict()
gcm_data['registration_ids'] = reg_ids
@ -309,25 +356,35 @@ class WebPusher:
endpoint = self.subscription_info['endpoint']
if 'ttl' not in headers or ttl:
self.verb("Generating TTL of 0...")
headers['ttl'] = str(ttl or 0)
# Additionally useful headers:
# Authorization / Crypto-Key (VAPID headers)
if curl:
return self.as_curl(endpoint, encoded_data, headers)
return self.requests_method.post(endpoint,
self.verb("\nSending request to"
"\n\thost: {}\n\theaders: {}\n\tdata: {}",
endpoint, headers, encoded_data)
resp = self.requests_method.post(endpoint,
data=encoded_data,
headers=headers,
timeout=timeout)
self.verb("\nResponse:\n\tcode: {}\n\tbody: {}\n",
resp.status_code, resp.text or "Empty")
return resp
def webpush(subscription_info,
data=None,
vapid_private_key=None,
vapid_claims=None,
content_encoding="aesgcm",
content_encoding="aes128gcm",
curl=False,
timeout=None,
ttl=0):
ttl=0,
verbose=False,
headers=None,
requests_session=None):
"""
One call solution to endcode and send `data` to the endpoint
contained in `subscription_info` using optional VAPID auth headers.
@ -369,18 +426,39 @@ def webpush(subscription_info,
:type timeout: float or tuple
:param ttl: Time To Live
:type ttl: int
:param verbose: Provide verbose feedback
:type verbose: bool
:return requests.Response or string
:param headers: Dictionary of extra HTTP headers to include
:type headers: dict
"""
if headers is None:
headers = dict()
else:
# Ensure we don't leak VAPID headers by mutating the passed in dict.
headers = headers.copy()
vapid_headers = None
if vapid_claims:
if verbose:
print("Generating VAPID headers...")
if not vapid_claims.get('aud'):
url = urlparse(subscription_info.get('endpoint'))
aud = "{}://{}".format(url.scheme, url.netloc)
vapid_claims['aud'] = aud
# Remember, passed structures are mutable in python.
# It's possible that a previously set `exp` field is no longer valid.
if (not vapid_claims.get('exp')
or vapid_claims.get('exp') < int(time.time())):
# encryption lives for 12 hours
vapid_claims['exp'] = int(time.time()) + (12 * 60 * 60)
if verbose:
print("Setting VAPID expry to {}...".format(
vapid_claims['exp']))
if not vapid_private_key:
raise WebPushException("VAPID dict missing 'private_key'")
if isinstance(vapid_private_key, Vapid):
if isinstance(vapid_private_key, Vapid01):
vv = vapid_private_key
elif os.path.isfile(vapid_private_key):
# Presume that key from file is handled correctly by
@ -389,17 +467,25 @@ def webpush(subscription_info,
private_key_file=vapid_private_key) # pragma no cover
else:
vv = Vapid.from_string(private_key=vapid_private_key)
if verbose:
print("\t claims: {}".format(vapid_claims))
vapid_headers = vv.sign(vapid_claims)
response = WebPusher(subscription_info).send(
if verbose:
print("\t headers: {}".format(vapid_headers))
headers.update(vapid_headers)
response = WebPusher(
subscription_info, requests_session=requests_session, verbose=verbose
).send(
data,
vapid_headers,
headers,
ttl=ttl,
content_encoding=content_encoding,
curl=curl,
timeout=timeout,
)
if not curl and response.status_code > 202:
raise WebPushException("Push failed: {} {}".format(
response.status_code, response.reason),
raise WebPushException("Push failed: {} {}\nResponse body:{}".format(
response.status_code, response.reason, response.text),
response=response)
return response

View File

@ -9,12 +9,14 @@ def get_config():
parser = argparse.ArgumentParser(description="WebPush tool")
parser.add_argument("--data", '-d', help="Data file")
parser.add_argument("--info", "-i", help="Subscription Info JSON file")
parser.add_argument("--head", help="Header Info JSON file")
parser.add_argument("--head", help="Header Info JSON file")
parser.add_argument("--claims", help="Vapid claim file")
parser.add_argument("--key", help="Vapid private key file path")
parser.add_argument("--curl", help="Don't send, display as curl command",
default=False, action="store_true")
parser.add_argument("--encoding", default="aesgcm")
parser.add_argument("--encoding", default="aes128gcm")
parser.add_argument("--verbose", "-v", help="Provide verbose feedback",
default=False, action="store_true")
args = parser.parse_args()
@ -53,7 +55,9 @@ def main():
vapid_private_key=args.key,
vapid_claims=args.claims,
curl=args.curl,
content_encoding=args.encoding)
content_encoding=args.encoding,
verbose=args.verbose,
headers=args.head)
print(result)
except Exception as ex:
print("ERROR: {}".format(ex))

View File

@ -2,11 +2,12 @@ import base64
import json
import os
import unittest
import time
from mock import patch, Mock
from nose.tools import eq_, ok_, assert_is_not, assert_raises
import http_ece
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import py_vapid
@ -38,8 +39,10 @@ class WebpushTestCase(unittest.TestCase):
def _get_pubkey_str(self, priv_key):
return base64.urlsafe_b64encode(
priv_key.public_key().public_numbers().encode_point()
).strip(b'=')
priv_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)).strip(b'=')
def test_init(self):
# use static values so we know what to look for in the reply
@ -75,12 +78,11 @@ class WebpushTestCase(unittest.TestCase):
"keys": {'p256dh': 'AAA=', 'auth': 'AAA='}})
push = WebPusher(subscription_info)
assert_is_not(push.subscription_info, subscription_info)
assert_is_not(push.subscription_info['keys'],
subscription_info['keys'])
eq_(push.subscription_info['endpoint'], subscription_info['endpoint'])
eq_(push.receiver_key, rk_decode)
eq_(push.auth_key, b'\x93\xc2U\xea\xc8\xddn\x10"\xd6}\xff,0K\xbc')
assert push.subscription_info != subscription_info
assert push.subscription_info['keys'] != subscription_info['keys']
assert push.subscription_info['endpoint'] == subscription_info['endpoint']
assert push.receiver_key == rk_decode
assert push.auth_key == b'\x93\xc2U\xea\xc8\xddn\x10"\xd6}\xff,0K\xbc'
def test_encode(self):
for content_encoding in ["aesgcm", "aes128gcm"]:
@ -100,11 +102,12 @@ class WebpushTestCase(unittest.TestCase):
if 'salt' in encoded:
raw_salt = base64.urlsafe_b64decode(
push._repad(encoded['salt']))
raw_dh = base64.urlsafe_b64decode(
push._repad(encoded['crypto_key']))
raw_dh = None
if content_encoding != "aes128gcm":
raw_dh = base64.urlsafe_b64decode(
push._repad(encoded['crypto_key']))
raw_auth = base64.urlsafe_b64decode(
push._repad(subscription_info['keys']['auth']))
decoded = http_ece.decrypt(
encoded['body'],
salt=raw_salt,
@ -113,7 +116,7 @@ class WebpushTestCase(unittest.TestCase):
auth_secret=raw_auth,
version=content_encoding
)
eq_(decoded.decode('utf8'), data)
assert decoded.decode('utf8') == data
def test_bad_content_encoding(self):
subscription_info = self._gen_subscription_info()
@ -131,14 +134,13 @@ class WebpushTestCase(unittest.TestCase):
"Authentication": "bearer vapid"}
data = "Mary had a little lamb"
WebPusher(subscription_info).send(data, headers)
eq_(subscription_info.get('endpoint'), mock_post.call_args[0][0])
assert subscription_info.get('endpoint') == mock_post.call_args[0][0]
pheaders = mock_post.call_args[1].get('headers')
eq_(pheaders.get('ttl'), '0')
ok_('encryption' in pheaders)
eq_(pheaders.get('AUTHENTICATION'), headers.get('Authentication'))
assert pheaders.get('ttl') == '0'
assert pheaders.get('AUTHENTICATION') == headers.get('Authentication')
ckey = pheaders.get('crypto-key')
ok_('pre-existing' in ckey)
eq_(pheaders.get('content-encoding'), 'aesgcm')
assert 'pre-existing' in ckey
assert pheaders.get('content-encoding') == 'aes128gcm'
@patch("requests.post")
def test_send_vapid(self, mock_post):
@ -149,11 +151,13 @@ class WebpushTestCase(unittest.TestCase):
subscription_info=subscription_info,
data=data,
vapid_private_key=self.vapid_key,
vapid_claims={"sub": "mailto:ops@example.com"}
vapid_claims={"sub": "mailto:ops@example.com"},
content_encoding="aesgcm",
headers={"Test-Header": "test-value"}
)
eq_(subscription_info.get('endpoint'), mock_post.call_args[0][0])
assert subscription_info.get('endpoint') == mock_post.call_args[0][0]
pheaders = mock_post.call_args[1].get('headers')
eq_(pheaders.get('ttl'), '0')
assert pheaders.get('ttl') == '0'
def repad(str):
return str + "===="[:len(str) % 4]
@ -163,13 +167,12 @@ class WebpushTestCase(unittest.TestCase):
repad(pheaders['authorization'].split('.')[1])
).decode('utf8')
)
ok_(subscription_info.get('endpoint').startswith(auth['aud']))
ok_('encryption' in pheaders)
ok_('WebPush' in pheaders.get('authorization'))
assert subscription_info.get('endpoint').startswith(auth['aud'])
assert 'vapid' in pheaders.get('authorization')
ckey = pheaders.get('crypto-key')
ok_('p256ecdsa=' in ckey)
ok_('dh=' in ckey)
eq_(pheaders.get('content-encoding'), 'aesgcm')
assert 'dh=' in ckey
assert pheaders.get('content-encoding') == 'aesgcm'
assert pheaders.get('test-header') == 'test-value'
@patch.object(WebPusher, "send")
@patch.object(py_vapid.Vapid, "sign")
@ -188,21 +191,41 @@ class WebpushTestCase(unittest.TestCase):
vapid_sign.assert_called_once_with(claims)
pusher_send.assert_called_once()
@patch.object(WebPusher, "send")
@patch.object(py_vapid.Vapid, "sign")
def test_webpush_vapid_exp(self, vapid_sign, pusher_send):
pusher_send.return_value.status_code = 200
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
vapid_key = py_vapid.Vapid.from_string(self.vapid_key)
claims = dict(sub="mailto:ops@example.com",
aud="https://example.com",
exp=int(time.time() - 48600))
webpush(
subscription_info=subscription_info,
data=data,
vapid_private_key=vapid_key,
vapid_claims=claims,
)
vapid_sign.assert_called_once_with(claims)
pusher_send.assert_called_once()
assert claims['exp'] > int(time.time())
@patch("requests.post")
def test_send_bad_vapid_no_key(self, mock_post):
mock_post.return_value.status_code = 200
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
assert_raises(WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
}
)
self.assertRaises(
WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
})
@patch("requests.post")
def test_send_bad_vapid_bad_return(self, mock_post):
@ -210,16 +233,16 @@ class WebpushTestCase(unittest.TestCase):
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
assert_raises(WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
vapid_private_key=self.vapid_key
)
self.assertRaises(
WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
vapid_private_key=self.vapid_key)
@patch("requests.post")
def test_send_empty(self, mock_post):
@ -227,20 +250,20 @@ class WebpushTestCase(unittest.TestCase):
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
WebPusher(subscription_info).send('', headers)
eq_(subscription_info.get('endpoint'), mock_post.call_args[0][0])
assert subscription_info.get('endpoint') == mock_post.call_args[0][0]
pheaders = mock_post.call_args[1].get('headers')
eq_(pheaders.get('ttl'), '0')
ok_('encryption' not in pheaders)
eq_(pheaders.get('AUTHENTICATION'), headers.get('Authentication'))
assert pheaders.get('ttl') == '0'
assert 'encryption' not in pheaders
assert pheaders.get('AUTHENTICATION') == headers.get('Authentication')
ckey = pheaders.get('crypto-key')
ok_('pre-existing' in ckey)
assert 'pre-existing' in ckey
def test_encode_empty(self):
subscription_info = self._gen_subscription_info()
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
encoded = WebPusher(subscription_info).encode('', headers)
eq_(encoded, None)
assert encoded is None
def test_encode_no_crypto(self):
subscription_info = self._gen_subscription_info()
@ -249,21 +272,21 @@ class WebpushTestCase(unittest.TestCase):
"Authentication": "bearer vapid"}
data = 'Something'
pusher = WebPusher(subscription_info)
assert_raises(WebPushException,
pusher.encode,
data,
headers)
self.assertRaises(
WebPushException,
pusher.encode,
data,
headers)
@patch("requests.post")
def test_send_no_headers(self, mock_post):
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
WebPusher(subscription_info).send(data)
eq_(subscription_info.get('endpoint'), mock_post.call_args[0][0])
assert subscription_info.get('endpoint') == mock_post.call_args[0][0]
pheaders = mock_post.call_args[1].get('headers')
eq_(pheaders.get('ttl'), '0')
ok_('encryption' in pheaders)
eq_(pheaders.get('content-encoding'), 'aesgcm')
assert pheaders.get('ttl') == '0'
assert pheaders.get('content-encoding') == 'aes128gcm'
@patch("pywebpush.open")
def test_as_curl(self, opener):
@ -280,22 +303,20 @@ class WebpushTestCase(unittest.TestCase):
)
for s in [
"curl -vX POST https://example.com",
"-H \"crypto-key: p256ecdsa=",
"-H \"content-encoding: aesgcm\"",
"-H \"authorization: WebPush ",
"-H \"encryption: salt=",
"-H \"content-encoding: aes128gcm\"",
"-H \"authorization: vapid ",
"-H \"ttl: 0\"",
"-H \"content-length:"
]:
ok_(s in result)
assert s in result, "missing: {}".format(s)
def test_ci_dict(self):
ci = CaseInsensitiveDict({"Foo": "apple", "bar": "banana"})
eq_('apple', ci["foo"])
eq_('apple', ci.get("FOO"))
eq_('apple', ci.get("Foo"))
assert 'apple' == ci["foo"]
assert 'apple' == ci.get("FOO")
assert 'apple' == ci.get("Foo")
del (ci['FOO'])
eq_(None, ci.get('Foo'))
assert ci.get('Foo') is None
@patch("requests.post")
def test_gcm(self, mock_post):
@ -309,18 +330,18 @@ class WebpushTestCase(unittest.TestCase):
wp.send(data, headers, gcm_key="gcm_key_value")
pdata = json.loads(mock_post.call_args[1].get('data'))
pheaders = mock_post.call_args[1].get('headers')
eq_(pdata["registration_ids"][0], "regid123")
eq_(pheaders.get("authorization"), "key=gcm_key_value")
eq_(pheaders.get("content-type"), "application/json")
assert pdata["registration_ids"][0] == "regid123"
assert pheaders.get("authorization") == "key=gcm_key_value"
assert pheaders.get("content-type") == "application/json"
@patch("requests.post")
def test_timeout(self, mock_post):
mock_post.return_value.status_code = 200
subscription_info = self._gen_subscription_info()
WebPusher(subscription_info).send(timeout=5.2)
eq_(mock_post.call_args[1].get('timeout'), 5.2)
assert mock_post.call_args[1].get('timeout') == 5.2
webpush(subscription_info, timeout=10.001)
eq_(mock_post.call_args[1].get('timeout'), 10.001)
assert mock_post.call_args[1].get('timeout') == 10.001
@patch("requests.Session")
def test_send_using_requests_session(self, mock_session):
@ -330,15 +351,14 @@ class WebpushTestCase(unittest.TestCase):
data = "Mary had a little lamb"
WebPusher(subscription_info,
requests_session=mock_session).send(data, headers)
eq_(subscription_info.get('endpoint'),
mock_session.post.call_args[0][0])
assert subscription_info.get(
'endpoint') == mock_session.post.call_args[0][0]
pheaders = mock_session.post.call_args[1].get('headers')
eq_(pheaders.get('ttl'), '0')
ok_('encryption' in pheaders)
eq_(pheaders.get('AUTHENTICATION'), headers.get('Authentication'))
assert pheaders.get('ttl') == '0'
assert pheaders.get('AUTHENTICATION') == headers.get('Authentication')
ckey = pheaders.get('crypto-key')
ok_('pre-existing' in ckey)
eq_(pheaders.get('content-encoding'), 'aesgcm')
assert 'pre-existing' in ckey
assert pheaders.get('content-encoding') == 'aes128gcm'
class WebpushExceptionTestCase(unittest.TestCase):

View File

@ -1,4 +1,5 @@
cryptography>=1.8.1
cryptography>=2.6.1
http-ece>=1.0.1
requests>=2.12.0
py-vapid>=1.3.0
requests>=2.21.0
six>=1.15.0
py-vapid>=1.7.0

View File

@ -3,21 +3,22 @@ import os
from setuptools import find_packages, setup
__version__ = "1.7.0"
__version__ = "1.14.0"
def read_from(file):
reply = []
with io.open(os.path.join(here, file), encoding='utf8') as f:
for l in f:
l = l.strip()
if not l:
for line in f:
line = line.strip()
if not line:
break
if l[:2] == '-r':
reply += read_from(l.split(' ')[1])
if line[:2] == '-r':
reply += read_from(line.split(' ')[1])
continue
if l[0] != '#' or l[:2] != '//':
reply.append(l)
if line[0] != '#' or line[:2] != '//':
reply.append(line)
return reply
@ -37,19 +38,13 @@ setup(
"Topic :: Internet :: WWW/HTTP",
"Programming Language :: Python :: Implementation :: PyPy",
'Programming Language :: Python',
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
],
keywords='push webpush publication',
author="JR Conlin",
author_email="src+webpusher@jrconlin.com",
url='https://github.com/web-push-libs/pywebpush',
license="MPL2",
test_suite="nose.collector",
include_package_data=True,
zip_safe=False,
install_requires=read_from('requirements.txt'),

View File

@ -1,5 +1,7 @@
-r requirements.txt
nose>=1.3.7
coverage>=4.2
pytest
coverage>=4.4.1
mock>=2.0.0
flake8>=3.2.0