# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Google API Extensions"""
from __future__ import absolute_import
import collections
import logging
import multiprocessing as mp
import dill
from grpc import RpcError, StatusCode
import pkg_resources
from google.gax.errors import GaxError
from google.gax.retry import retryable
from google.rpc import code_pb2
# pylint: disable=no-member
__version__ = pkg_resources.get_distribution('google-gax').version
# pylint: enable=no-member
_LOG = logging.getLogger(__name__)
_LOG.addHandler(logging.NullHandler())
INITIAL_PAGE = object()
"""A placeholder for the page token passed into an initial paginated request."""
OPTION_INHERIT = object()
"""Global constant.
If a CallOptions field is set to OPTION_INHERIT, the call to which that
CallOptions belongs will attempt to inherit that field from its default
settings."""
class _CallSettings(object):
"""Encapsulates the call settings for an API call."""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=30, retry=None, page_descriptor=None,
page_token=None, bundler=None, bundle_descriptor=None,
kwargs=None):
"""Constructor.
Args:
timeout (int): The client-side timeout for API calls. This
parameter is ignored for retrying calls.
retry (RetryOptions): The configuration for retrying upon
transient error. If set to None, this call will not retry.
page_descriptor (PageDescriptor): indicates the structure
of page streaming to be performed. If set to None, page streaming
is disabled.
page_token (str): If there is no ``page_descriptor``, this attribute
has no meaning. Otherwise, determines the page token used in the
page streaming request.
bundler (gax.bundling.Executor): orchestrates bundling. If
None, bundling is not performed.
bundle_descriptor (BundleDescriptor): indicates the
structure of of the bundle. If None, bundling is disabled.
kwargs (dict): other keyword arguments to be passed to the API
calls.
"""
self.timeout = timeout
self.retry = retry
self.page_descriptor = page_descriptor
self.page_token = page_token
self.bundler = bundler
self.bundle_descriptor = bundle_descriptor
self.kwargs = kwargs or {}
@property
def flatten_pages(self):
"""
A boolean property indicating whether a page streamed response should
make the page structure transparent to the user by flattening the
repeated field in the returned iterator.
There is no ``page_descriptor``, this means nothing.
"""
return self.page_token is None
def merge(self, options):
"""Returns new _CallSettings merged from this and a CallOptions object.
Note that passing if the CallOptions instance specifies a page_token,
the merged _CallSettings will have ``flatten_pages`` disabled. This
permits toggling per-resource/per-page page streaming.
Args:
options (CallOptions): an instance whose values override
those in this object. If None, ``merge`` returns a copy of this
object
Returns:
CallSettings: The merged settings and options.
"""
if not options:
return _CallSettings(
timeout=self.timeout, retry=self.retry,
page_descriptor=self.page_descriptor,
page_token=self.page_token,
bundler=self.bundler, bundle_descriptor=self.bundle_descriptor,
kwargs=self.kwargs)
else:
if options.timeout == OPTION_INHERIT:
timeout = self.timeout
else:
timeout = options.timeout
if options.retry == OPTION_INHERIT:
retry = self.retry
else:
retry = options.retry
if options.page_token == OPTION_INHERIT:
page_token = self.page_token
else:
page_token = options.page_token
if options.is_bundling:
bundler = self.bundler
else:
bundler = None
if options.kwargs == OPTION_INHERIT:
kwargs = self.kwargs
else:
kwargs = self.kwargs.copy()
kwargs.update(options.kwargs)
return _CallSettings(
timeout=timeout, retry=retry,
page_descriptor=self.page_descriptor, page_token=page_token,
bundler=bundler, bundle_descriptor=self.bundle_descriptor,
kwargs=kwargs)
[docs]class CallOptions(object):
"""Encapsulates the overridable settings for a particular API call.
``CallOptions`` is an optional arg for all GAX API calls. It is used to
configure the settings of a specific API call.
When provided, its values override the GAX service defaults for that
particular call.
"""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT,
page_token=OPTION_INHERIT, is_bundling=False, **kwargs):
"""
Example:
>>> # change an api call's timeout
>>> o1 = CallOptions(timeout=30) # make the timeout 30 seconds
>>>
>>> # set page streaming to be per-page on a call where it is
>>> # normally per-resource
>>> o2 = CallOptions(page_token=INITIAL_PAGE)
>>>
>>> # disable retrying on an api call that normally retries
>>> o3 = CallOptions(retry=None)
>>>
>>> # enable bundling on a call that supports it
>>> o4 = CallOptions(is_bundling=True)
Args:
timeout (int): The client-side timeout for non-retrying API calls.
retry (RetryOptions): determines whether and how to retry
on transient errors. When set to None, the call will not retry.
page_token (str): If set and the call is configured for page
streaming, page streaming is performed per-page, starting with
this page_token. Use ``INITIAL_PAGE`` for the first request.
If unset and the call is configured for page streaming, page
streaming is performed per-resource.
is_bundling (bool): If set and the call is configured for bundling,
bundling is performed. Bundling is always disabled by default.
kwargs: Additional arguments passed through to the API call.
Raises:
ValueError: if incompatible options are specified.
"""
if not (timeout == OPTION_INHERIT or retry == OPTION_INHERIT):
raise ValueError('The CallOptions has incompatible settings: '
'"timeout" cannot be specified on a retrying call')
self.timeout = timeout
self.retry = retry
self.page_token = page_token
self.is_bundling = is_bundling
self.kwargs = kwargs or OPTION_INHERIT
[docs]class PageDescriptor(
collections.namedtuple(
'PageDescriptor',
['request_page_token_field',
'response_page_token_field',
'resource_field'])):
"""Describes the structure of a page-streaming call."""
pass
[docs]class RetryOptions(
collections.namedtuple(
'RetryOptions',
['retry_codes',
'backoff_settings'])):
"""Per-call configurable settings for retrying upon transient failure.
Attributes:
retry_codes (list[string]): a list of Google API canonical error codes
upon which a retry should be attempted.
backoff_settings (:class:`BackoffSettings`): configures the retry
exponential backoff algorithm.
"""
pass
[docs]class BackoffSettings(
collections.namedtuple(
'BackoffSettings',
['initial_retry_delay_millis',
'retry_delay_multiplier',
'max_retry_delay_millis',
'initial_rpc_timeout_millis',
'rpc_timeout_multiplier',
'max_rpc_timeout_millis',
'total_timeout_millis'])):
"""Parameters to the exponential backoff algorithm for retrying.
Attributes:
initial_retry_delay_millis: the initial delay time, in milliseconds,
between the completion of the first failed request and the initiation of
the first retrying request.
retry_delay_multiplier: the multiplier by which to increase the delay time
between the completion of failed requests, and the initiation of the
subsequent retrying request.
max_retry_delay_millis: the maximum delay time, in milliseconds, between
requests. When this value is reached, ``retry_delay_multiplier`` will no
longer be used to increase delay time.
initial_rpc_timeout_millis: the initial timeout parameter to the request.
rpc_timeout_multiplier: the multiplier by which to increase the timeout
parameter between failed requests.
max_rpc_timeout_millis: the maximum timeout parameter, in milliseconds,
for a request. When this value is reached, ``rpc_timeout_multiplier``
will no longer be used to increase the timeout.
total_timeout_millis: the total time, in milliseconds, starting from when
the initial request is sent, after which an error will be returned,
regardless of the retrying attempts made meanwhile.
"""
pass
[docs]class BundleDescriptor(
collections.namedtuple(
'BundleDescriptor',
['bundled_field',
'request_discriminator_fields',
'subresponse_field'])):
"""Describes the structure of bundled call.
request_discriminator_fields may include '.' as a separator, which is used
to indicate object traversal. This allows fields in nested objects to be
used to determine what requests to bundle.
Attributes:
bundled_field: the repeated field in the request message that
will have its elements aggregated by bundling
request_discriminator_fields: a list of fields in the
target request message class that are used to determine
which messages should be bundled together.
subresponse_field: an optional field, when present it indicates the field
in the response message that should be used to demultiplex the response
into multiple response messages.
"""
def __new__(cls,
bundled_field,
request_discriminator_fields,
subresponse_field=None):
return super(cls, BundleDescriptor).__new__(
cls,
bundled_field,
request_discriminator_fields,
subresponse_field)
[docs]class BundleOptions(
collections.namedtuple(
'BundleOptions',
['element_count_threshold',
'element_count_limit',
'request_byte_threshold',
'request_byte_limit',
'delay_threshold'])):
"""Holds values used to configure bundling.
The xxx_threshold attributes are used to configure when the bundled request
should be made.
Attributes:
element_count_threshold: the bundled request will be sent once the
count of outstanding elements in the repeated field reaches this
value.
element_count_limit: represents a hard limit on the number of elements
in the repeated field of the bundle; if adding a request to a bundle
would exceed this value, the bundle is sent and the new request is
added to a fresh bundle. It is invalid for a single request to exceed
this limit.
request_byte_threshold: the bundled request will be sent once the count
of bytes in the request reaches this value. Note that this value is
pessimistically approximated by summing the bytesizes of the elements
in the repeated field, and therefore may be an under-approximation.
request_byte_limit: represents a hard limit on the size of the bundled
request; if adding a request to a bundle would exceed this value, the
bundle is sent and the new request is added to a fresh bundle. It is
invalid for a single request to exceed this limit. Note that this
value is pessimistically approximated by summing the bytesizes of the
elements in the repeated field, with a buffer applied to correspond to
the resulting under-approximation.
delay_threshold: the bundled request will be sent this amount of
time after the first element in the bundle was added to it.
"""
# pylint: disable=too-few-public-methods
[docs] def __new__(cls,
element_count_threshold=0,
element_count_limit=0,
request_byte_threshold=0,
request_byte_limit=0,
delay_threshold=0):
"""Invokes the base constructor with default values.
The default values are zero for all attributes and it's necessary to
specify at least one valid threshold value during construction.
Args:
element_count_threshold (int): the bundled request will be sent
once the count of outstanding elements in the repeated field
reaches this value.
element_count_limit (int): represents a hard limit on the number of
elements in the repeated field of the bundle; if adding a
request to a bundle would exceed this value, the bundle is sent
and the new request is added to a fresh bundle. It is invalid
for a single request to exceed this limit.
request_byte_threshold (int): the bundled request will be sent once
the count of bytes in the request reaches this value. Note that
this value is pessimistically approximated by summing the
bytesizes of the elements in the repeated field, with a buffer
applied to compensate for the corresponding
under-approximation.
request_byte_limit (int): represents a hard limit on the size of
the bundled request; if adding a request to a bundle would
exceed this value, the bundle is sent and the new request is
added to a fresh bundle. It is invalid for a single request to
exceed this limit. Note that this value is pessimistically
approximated by summing the bytesizes of the elements in the
repeated field, with a buffer applied to correspond to the
resulting under-approximation.
delay_threshold (int): the bundled request will be sent this amount
of time after the first element in the bundle was added to it.
Returns:
BundleOptions: the constructed object.
"""
assert isinstance(element_count_threshold, int), 'should be an int'
assert isinstance(element_count_limit, int), 'should be an int'
assert isinstance(request_byte_threshold, int), 'should be an int'
assert isinstance(request_byte_limit, int), 'should be an int'
assert isinstance(delay_threshold, int), 'should be an int'
assert (element_count_threshold > 0 or
request_byte_threshold > 0 or
delay_threshold > 0), 'one threshold should be > 0'
return super(cls, BundleOptions).__new__(
cls,
element_count_threshold,
element_count_limit,
request_byte_threshold,
request_byte_limit,
delay_threshold)
[docs]class PageIterator(object):
"""An iterator over the pages of a page streaming API call.
Provides access to the individual pages of the call, as well as the page
token.
Attributes:
response: The full response message for the call most recently made, or
None if a call has not yet been made.
page_token: The page token to be passed in the request for the next call
to be made.
"""
# pylint: disable=too-few-public-methods
def __init__(self, api_call, page_descriptor, page_token, request,
**kwargs):
"""
Args:
api_call (Callable[[req], resp]): an API call that is page
streaming.
page_descriptor (PageDescriptor): indicates the structure
of page streaming to be performed.
page_token (str): The page token to be passed to API call request.
If no page token has yet been acquired, this field should be set
to ``INITIAL_PAGE``.
request (object): The request to be passed to the API call. The page
token field of the request is overwritten by the ``page_token``
passed to the constructor, unless ``page_token`` is
``INITIAL_PAGE``.
kwargs: Arbitrary keyword arguments to be passed to the API call.
"""
self.response = None
self.page_token = page_token or INITIAL_PAGE
self._func = api_call
self._page_descriptor = page_descriptor
self._request = request
self._kwargs = kwargs
self._done = False
def __iter__(self):
return self
[docs] def next(self):
"""For Python 2.7 compatibility; see __next__."""
return self.__next__()
[docs] def __next__(self):
"""Retrieves the next page."""
if self._done:
raise StopIteration
if self.page_token != INITIAL_PAGE:
setattr(self._request,
self._page_descriptor.request_page_token_field,
self.page_token)
response = self._func(self._request, **self._kwargs)
self.page_token = getattr(
response, self._page_descriptor.response_page_token_field)
if not self.page_token:
self._done = True
return getattr(response, self._page_descriptor.resource_field)
[docs]class ResourceIterator(object):
"""An iterator over resources of the page iterator."""
# pylint: disable=too-few-public-methods
def __init__(self, page_iterator):
"""Constructor.
Args:
page_iterator (PageIterator): the base iterator of getting pages.
"""
self._page_iterator = page_iterator
self._current = None
self._index = -1
def __iter__(self):
return self
[docs] def next(self):
"""For Python 2.7 compatibility; see __next__."""
return self.__next__()
[docs] def __next__(self):
"""Retrieves the next resource."""
# pylint: disable=next-method-called
while not self._current:
self._current = next(self._page_iterator)
self._index = 0
resource = self._current[self._index]
self._index += 1
if self._index >= len(self._current):
self._current = None
return resource
def _from_any(pb_type, any_pb):
"""Converts an Any protobuf to the specified message type
Args:
pb_type (type): the type of the message that any_pb stores an instance
of.
any_pb (google.protobuf.any_pb2.Any): the object to be converted.
Returns:
pb_type: An instance of the pb_type message.
Raises:
TypeError: if the message could not be converted.
"""
msg = pb_type()
# Check exceptional case: raise if can't Unpack
if not any_pb.Unpack(msg):
raise TypeError(
'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))
# Return expected message
return msg
def _try_callback(target, clbk):
try:
clbk(target)
except Exception as ex: # pylint: disable=broad-except
_LOG.exception(ex)
class _DeadlineExceededError(RpcError, GaxError):
def __init__(self):
super(_DeadlineExceededError, self).__init__('Deadline Exceeded')
def code(self): # pylint: disable=no-self-use
"""Always returns StatusCode.DEADLINE_EXCEEDED"""
return StatusCode.DEADLINE_EXCEEDED
class _OperationFuture(object):
"""A Future which polls a service for completion via OperationsClient."""
def __init__(self, operation, client, result_type, metadata_type,
call_options=None):
"""
Args:
operation (google.longrunning.Operation): the initial long-running
operation object.
client
(google.gapic.longrunning.operations_client.OperationsClient):
a client for the long-running operation service.
result_type (type): the class type of the result.
metadata_type (Optional[type]): the class type of the metadata.
call_options (Optional[google.gax.CallOptions]): the call options
that are used when reloading the operation.
"""
self._operation = operation
self._client = client
self._result_type = result_type
self._metadata_type = metadata_type
self._call_options = call_options
self._queue = mp.Queue()
self._process = None
def cancel(self):
"""If last Operation's value of `done` is true, returns false;
otherwise, issues OperationsClient.cancel_operation and returns true.
"""
if self.done():
return False
self._client.cancel_operation(self._operation.name)
return True
def result(self, timeout=None):
"""Enters polling loop on OperationsClient.get_operation, and once
Operation.done is true, then returns Operation.response if successful or
throws GaxError if not successful.
This method will wait up to timeout seconds. If the call hasn't
completed in timeout seconds, then a RetryError will be raised. timeout
can be an int or float. If timeout is not specified or None, there is no
limit to the wait time.
"""
# Check exceptional case: raise if no response
if not self._poll(timeout).HasField('response'):
raise GaxError(self._operation.error.message)
# Return expected result
return _from_any(self._result_type, self._operation.response)
def exception(self, timeout=None):
"""Similar to result(), except returns the exception if any."""
# Check exceptional case: return none if no error
if not self._poll(timeout).HasField('error'):
return None
# Return expected error
return self._operation.error
def cancelled(self):
"""Return True if the call was successfully cancelled."""
self._get_operation()
return (self._operation.HasField('error') and
self._operation.error.code == code_pb2.CANCELLED)
def done(self):
"""Issues OperationsClient.get_operation and returns value of
Operation.done.
"""
return self._get_operation().done
def add_done_callback(self, fn): # pylint: disable=invalid-name
"""Enters a polling loop on OperationsClient.get_operation, and once the
operation is done or cancelled, calls the function with this
_OperationFuture. Added callables are called in the order that they were
added.
"""
if self._operation.done:
_try_callback(self, fn)
else:
self._queue.put(dill.dumps(fn))
if self._process is None:
self._process = mp.Process(target=self._execute_tasks)
self._process.start()
def operation_name(self):
"""Returns the value of Operation.name."""
return self._operation.name
def metadata(self):
"""Returns the value of Operation.metadata from the last call to
OperationsClient.get_operation (or if only the initial API call has been
made, the metadata from that first call).
"""
# Check exceptional case: return none if no metadata
if not self._operation.HasField('metadata'):
return None
# Return expected metadata
return _from_any(self._metadata_type, self._operation.metadata)
def last_operation_data(self):
"""Returns the data from the last call to OperationsClient.get_operation
(or if only the initial API call has been made, the data from that first
call).
"""
return self._operation
def _get_operation(self):
if not self._operation.done:
self._operation = self._client.get_operation(
self._operation.name, self._call_options)
return self._operation
def _poll(self, timeout=None):
def _done_check(_):
# Check exceptional case: raise if in progress
if not self.done():
raise _DeadlineExceededError()
# Return expected operation
return self._operation
# If a timeout is set, then convert it to milliseconds.
#
# Also, we need to send 0 instead of None for the rpc arguments,
# because an internal method (`_has_timeout_settings`) will
# erroneously return False otherwise.
rpc_arg = None
if timeout is not None:
timeout *= 1000
rpc_arg = 0
# Set the backoff settings. We have specific backoff settings
# for "are we there yet" calls that are distinct from those configured
# in the config.json files.
backoff_settings = BackoffSettings(
initial_retry_delay_millis=1000,
retry_delay_multiplier=2,
max_retry_delay_millis=30000,
initial_rpc_timeout_millis=rpc_arg,
rpc_timeout_multiplier=rpc_arg,
max_rpc_timeout_millis=rpc_arg,
total_timeout_millis=timeout,
)
# Set the retry to retry if `_done_check` raises the
# _DeadlineExceededError, according to the given backoff settings.
retry_options = RetryOptions(
[StatusCode.DEADLINE_EXCEEDED], backoff_settings)
retryable_done_check = retryable(_done_check, retry_options)
# Start polling, and return the final result from `_done_check`.
return retryable_done_check()
def _execute_tasks(self):
self._poll()
while not self._queue.empty():
task = dill.loads(self._queue.get())
_try_callback(self, task)