feat: Add `--verbose` flag (#117)

Closes: #116
This commit is contained in:
JR Conlin 2019-08-13 15:16:50 -07:00 committed by GitHub
parent 9bc6f69b61
commit 10e2e1629c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 13 deletions

View File

@ -1,5 +1,9 @@
# I am terrible at keeping this up-to-date.
## 1.10.0 (2019-08-13)
feat: Add `--verbose` flag with some initial commentary
bug: Update tests to use latest VAPID version
## 1.9.4 (2019-05-09)
bug: update vapid `exp` header if missing or expired

View File

@ -116,8 +116,10 @@ class WebPusher:
"aesgcm", # draft-httpbis-encryption-encoding-01
"aes128gcm" # RFC8188 Standard encoding
]
verbose = False
def __init__(self, subscription_info, requests_session=None):
def __init__(self, subscription_info, requests_session=None,
verbose=False):
"""Initialize using the info provided by the client PushSubscription
object (See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe)
@ -130,7 +132,12 @@ class WebPusher:
to the same client.
:type requests_session: requests.Session
:param verbose: provide verbose feedback
:type verbose: bool
"""
self.verbose = verbose
if requests_session is None:
self.requests_method = requests
else:
@ -155,6 +162,10 @@ class WebPusher:
self.auth_key = base64.urlsafe_b64decode(
self._repad(keys['auth']))
def verb(self, msg, *args, **kwargs):
if self.verbose:
print(msg.format(*args, **kwargs))
def _repad(self, data):
"""Add base64 padding to the end of a string, if required"""
return data + b"===="[:len(data) % 4]
@ -174,15 +185,18 @@ class WebPusher:
"""
# Salt is a random 16 byte array.
if not data:
self.verb("No data found...")
return
if not self.auth_key or not self.receiver_key:
raise WebPushException("No keys specified in subscription info")
self.verb("Encoding data...")
salt = None
if content_encoding not in self.valid_encodings:
raise WebPushException("Invalid content encoding specified. "
"Select from " +
json.dumps(self.valid_encodings))
if content_encoding == "aesgcm":
self.verb("Generating salt for aesgcm...")
salt = os.urandom(16)
# The server key is an ephemeral ECDH key used only for this
# transaction
@ -195,6 +209,7 @@ class WebPusher:
if isinstance(data, six.string_types):
data = bytes(data.encode('utf8'))
if content_encoding == "aes128gcm":
self.verb("Encrypting to aes128gcm...")
encrypted = http_ece.encrypt(
data,
salt=salt,
@ -206,6 +221,7 @@ class WebPusher:
'body': encrypted
})
else:
self.verb("Encrypting to aesgcm...")
crypto_key = base64.urlsafe_b64encode(crypto_key).strip(b'=')
encrypted = http_ece.encrypt(
data,
@ -248,6 +264,7 @@ class WebPusher:
f.write(encoded_data)
data = "--data-binary @encrypted.data"
if 'content-length' not in headers:
self.verb("Generating content-length header...")
header_list.append(
'-H "content-length: {}" \\ \n'.format(len(encoded_data)))
return ("""curl -vX POST {url} \\\n{headers}{data}""".format(
@ -312,12 +329,15 @@ class WebPusher:
# gcm keys are all about 40 chars (use 100 for confidence),
# fcm keys are 153-175 chars
if len(gcm_key) < 100:
self.verb("Guessing this is legacy GCM...")
endpoint = 'https://android.googleapis.com/gcm/send'
else:
self.verb("Guessing this is FCM...")
endpoint = 'https://fcm.googleapis.com/fcm/send'
reg_ids = []
if not reg_id:
reg_id = self.subscription_info['endpoint'].rsplit('/', 1)[-1]
self.verb("Fetching out registration id: {}", reg_id)
reg_ids.append(reg_id)
gcm_data = dict()
gcm_data['registration_ids'] = reg_ids
@ -336,15 +356,22 @@ class WebPusher:
endpoint = self.subscription_info['endpoint']
if 'ttl' not in headers or ttl:
self.verb("Generating TTL of 0...")
headers['ttl'] = str(ttl or 0)
# Additionally useful headers:
# Authorization / Crypto-Key (VAPID headers)
if curl:
return self.as_curl(endpoint, encoded_data, headers)
return self.requests_method.post(endpoint,
self.verb("\nSending request to"
"\n\thost: {}\n\theaders: {}\n\tdata: {}",
endpoint, headers, encoded_data)
resp = self.requests_method.post(endpoint,
data=encoded_data,
headers=headers,
timeout=timeout)
self.verb("\nResponse:\n\tcode: {}\n\tbody: {}\n",
resp.status_code, resp.text or "Empty")
return resp
def webpush(subscription_info,
@ -354,7 +381,8 @@ def webpush(subscription_info,
content_encoding="aes128gcm",
curl=False,
timeout=None,
ttl=0):
ttl=0,
verbose=False):
"""
One call solution to endcode and send `data` to the endpoint
contained in `subscription_info` using optional VAPID auth headers.
@ -396,11 +424,15 @@ def webpush(subscription_info,
:type timeout: float or tuple
:param ttl: Time To Live
:type ttl: int
:param verbose: Provide verbose feedback
:type verbose: bool
:return requests.Response or string
"""
vapid_headers = None
if vapid_claims:
if verbose:
print("Generating VAPID headers...")
if not vapid_claims.get('aud'):
url = urlparse(subscription_info.get('endpoint'))
aud = "{}://{}".format(url.scheme, url.netloc)
@ -411,6 +443,9 @@ def webpush(subscription_info,
or vapid_claims.get('exp') < int(time.time())):
# encryption lives for 12 hours
vapid_claims['exp'] = int(time.time()) + (12 * 60 * 60)
if verbose:
print("Setting VAPID expry to {}...".format(
vapid_claims['exp']))
if not vapid_private_key:
raise WebPushException("VAPID dict missing 'private_key'")
if isinstance(vapid_private_key, Vapid):
@ -422,8 +457,13 @@ def webpush(subscription_info,
private_key_file=vapid_private_key) # pragma no cover
else:
vv = Vapid.from_string(private_key=vapid_private_key)
if verbose:
print("\t claims: {}".format(vapid_claims))
vapid_headers = vv.sign(vapid_claims)
response = WebPusher(subscription_info).send(
if verbose:
print("\t headers: {}".format(vapid_headers))
response = WebPusher(subscription_info, verbose=verbose).send(
data,
vapid_headers,
ttl=ttl,
@ -432,7 +472,7 @@ def webpush(subscription_info,
timeout=timeout,
)
if not curl and response.status_code > 202:
raise WebPushException("Push failed: {} {}".format(
response.status_code, response.reason),
raise WebPushException("Push failed: {} {}\nResponse body:{}".format(
response.status_code, response.reason, response.text),
response=response)
return response

View File

@ -15,6 +15,8 @@ def get_config():
parser.add_argument("--curl", help="Don't send, display as curl command",
default=False, action="store_true")
parser.add_argument("--encoding", default="aes128gcm")
parser.add_argument("--verbose", "-v", help="Provide verbose feedback",
default=False, action="store_true")
args = parser.parse_args()
@ -53,7 +55,8 @@ def main():
vapid_private_key=args.key,
vapid_claims=args.claims,
curl=args.curl,
content_encoding=args.encoding)
content_encoding=args.encoding,
verbose=args.verbose)
print(result)
except Exception as ex:
print("ERROR: {}".format(ex))

View File

@ -169,9 +169,8 @@ class WebpushTestCase(unittest.TestCase):
).decode('utf8')
)
ok_(subscription_info.get('endpoint').startswith(auth['aud']))
ok_('WebPush' in pheaders.get('authorization'))
ok_('vapid' in pheaders.get('authorization'))
ckey = pheaders.get('crypto-key')
ok_('p256ecdsa=' in ckey)
ok_('dh=' in ckey)
eq_(pheaders.get('content-encoding'), 'aesgcm')
@ -303,13 +302,12 @@ class WebpushTestCase(unittest.TestCase):
)
for s in [
"curl -vX POST https://example.com",
"-H \"crypto-key: p256ecdsa=",
"-H \"content-encoding: aes128gcm\"",
"-H \"authorization: WebPush ",
"-H \"authorization: vapid ",
"-H \"ttl: 0\"",
"-H \"content-length:"
]:
ok_(s in result)
ok_(s in result, "missing: {}".format(s))
def test_ci_dict(self):
ci = CaseInsensitiveDict({"Foo": "apple", "bar": "banana"})

View File

@ -4,7 +4,7 @@ import os
from setuptools import find_packages, setup
__version__ = "1.9.4"
__version__ = "1.10.0"
def read_from(file):