Source code for google.gax.api_callable

# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Provides function wrappers that implement page streaming and retrying."""

from __future__ import absolute_import, division, unicode_literals

from future import utils

from google import gax
from google.gax import bundling
from google.gax.utils import metrics

_MILLIS_PER_SECOND = 1000


def _bundleable(desc):
    """Creates a function that transforms an API call into a bundling call.

    It transform a_func from an API call that receives the requests and returns
    the response into a callable that receives the same request, and
    returns a :class:`bundling.Event`.

    The returned Event object can be used to obtain the eventual result of the
    bundled call.

    Args:
      desc (gax.BundleDescriptor): describes the bundling that a_func
        supports.

    Returns:
      Callable: takes the API call's request and keyword args and returns a
        bundling.Event object.

    """
    def inner(a_func, settings, request, **kwargs):
        """Schedules execution of a bundling task."""
        if not settings.bundler:
            return a_func(request, **kwargs)

        the_id = bundling.compute_bundle_id(
            request, desc.request_discriminator_fields)
        return settings.bundler.schedule(a_func, the_id, desc, request, kwargs)

    return inner


def _page_streamable(page_descriptor):
    """Creates a function that yields an iterable to performs page-streaming.

    Args:
        page_descriptor (:class:`PageDescriptor`): indicates the structure
          of page streaming to be performed.

    Returns:
        Callable: A function that returns an iterator.
    """

    def inner(a_func, settings, request, **kwargs):
        """Actual page-streaming based on the settings."""
        page_iterator = gax.PageIterator(
            a_func, page_descriptor, settings.page_token, request, **kwargs)
        if settings.flatten_pages:
            return gax.ResourceIterator(page_iterator)
        else:
            return page_iterator

    return inner


def _construct_bundling(bundle_config, bundle_descriptor):
    """Helper for ``construct_settings()``.

    Args:
      bundle_config (dict): A dictionary specifying a bundle parameters, the
        value for 'bundling' field in a method config (See
        ``construct_settings()`` for information on this config.)
      bundle_descriptor (BundleDescriptor): A BundleDescriptor object
        describing the structure of bundling for this method. If not set,
        this method will not bundle.

    Returns:
      Tuple[bundling.Executor, BundleDescriptor]: A tuple that configures
        bundling. The bundling.Executor may be None if this method should not
        bundle.
    """
    if bundle_config and bundle_descriptor:
        bundler = bundling.Executor(gax.BundleOptions(
            element_count_threshold=bundle_config.get(
                'element_count_threshold', 0),
            element_count_limit=bundle_config.get('element_count_limit', 0),
            request_byte_threshold=bundle_config.get(
                'request_byte_threshold', 0),
            request_byte_limit=bundle_config.get('request_byte_limit', 0),
            delay_threshold=bundle_config.get('delay_threshold_millis', 0)))
    else:
        bundler = None

    return bundler


def _construct_retry(method_config, retry_codes, retry_params, retry_names):
    """Helper for ``construct_settings()``.

    Args:
      method_config (dict): A dictionary representing a single ``methods``
        entry of the standard API client config file. (See
        ``construct_settings()`` for information on this yaml.)
      retry_codes (dict): A dictionary parsed from the ``retry_codes`` entry
        of the standard API client config file. (See ``construct_settings()``
        for information on this yaml.)
      retry_params (dict): A dictionary parsed from the ``retry_params`` entry
        of the standard API client config file. (See ``construct_settings()``
        for information on this yaml.)
      retry_names (dict): A dictionary mapping the string names used in the
        standard API client config file to API response status codes.

    Returns:
      Optional[RetryOptions]: The retry options, if applicable.
    """
    if method_config is None:
        return None

    codes = None
    if retry_codes and 'retry_codes_name' in method_config:
        codes_name = method_config['retry_codes_name']
        if codes_name in retry_codes and retry_codes[codes_name]:
            codes = [retry_names[name] for name in retry_codes[codes_name]]
        else:
            codes = []

    backoff_settings = None
    if retry_params and 'retry_params_name' in method_config:
        params_name = method_config['retry_params_name']
        if params_name and params_name in retry_params:
            backoff_settings = gax.BackoffSettings(**retry_params[params_name])

    return gax.RetryOptions(
        backoff_settings=backoff_settings,
        retry_codes=codes,
    )


def _merge_retry_options(retry_options, overrides):
    """Helper for ``construct_settings()``.

    Takes two retry options, and merges them into a single RetryOption instance.

    Args:
      retry_options (RetryOptions): The base RetryOptions.
      overrides (RetryOptions): The RetryOptions used for overriding ``retry``.
        Use the values if it is not None. If entire ``overrides`` is None,
        ignore the base retry and return None.

    Returns:
      RetryOptions: The merged options, or None if it will be canceled.
    """
    if overrides is None:
        return None

    if overrides.retry_codes is None and overrides.backoff_settings is None:
        return retry_options

    codes = retry_options.retry_codes
    if overrides.retry_codes is not None:
        codes = overrides.retry_codes
    backoff_settings = retry_options.backoff_settings
    if overrides.backoff_settings is not None:
        backoff_settings = overrides.backoff_settings

    return gax.RetryOptions(
        backoff_settings=backoff_settings,
        retry_codes=codes,
    )


def _upper_camel_to_lower_under(string):
    if not string:
        return ''
    out = ''
    out += string[0].lower()
    for char in string[1:]:
        if char.isupper():
            out += '_' + char.lower()
        else:
            out += char
    return out


[docs]def construct_settings( service_name, client_config, config_override, retry_names, bundle_descriptors=None, page_descriptors=None, metrics_headers=(), kwargs=None): """Constructs a dictionary mapping method names to _CallSettings. The ``client_config`` parameter is parsed from a client configuration JSON file of the form: .. code-block:: json { "interfaces": { "google.fake.v1.ServiceName": { "retry_codes": { "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"], "non_idempotent": [] }, "retry_params": { "default": { "initial_retry_delay_millis": 100, "retry_delay_multiplier": 1.2, "max_retry_delay_millis": 1000, "initial_rpc_timeout_millis": 2000, "rpc_timeout_multiplier": 1.5, "max_rpc_timeout_millis": 30000, "total_timeout_millis": 45000 } }, "methods": { "CreateFoo": { "retry_codes_name": "idempotent", "retry_params_name": "default", "timeout_millis": 30000 }, "Publish": { "retry_codes_name": "non_idempotent", "retry_params_name": "default", "bundling": { "element_count_threshold": 40, "element_count_limit": 200, "request_byte_threshold": 90000, "request_byte_limit": 100000, "delay_threshold_millis": 100 } } } } } } Args: service_name (str): The fully-qualified name of this service, used as a key into the client config file (in the example above, this value would be ``google.fake.v1.ServiceName``). client_config (dict): A dictionary parsed from the standard API client config file. bundle_descriptors (Mapping[str, BundleDescriptor]): A dictionary of method names to BundleDescriptor objects for methods that are bundling-enabled. page_descriptors (Mapping[str, PageDescriptor]): A dictionary of method names to PageDescriptor objects for methods that are page streaming-enabled. config_override (str): A dictionary in the same structure of client_config to override the settings. Usually client_config is supplied from the default config and config_override will be specified by users. retry_names (Mapping[str, object]): A dictionary mapping the strings referring to response status codes to the Python objects representing those codes. metrics_headers (Mapping[str, str]): Dictionary of headers to be passed for analytics. Sent as a dictionary; eventually becomes a space-separated string (e.g. 'foo/1.0.0 bar/3.14.1'). kwargs (dict): The keyword arguments to be passed to the API calls. Returns: dict: A dictionary mapping method names to _CallSettings. Raises: KeyError: If the configuration for the service in question cannot be located in the provided ``client_config``. """ # pylint: disable=too-many-locals # pylint: disable=protected-access defaults = {} bundle_descriptors = bundle_descriptors or {} page_descriptors = page_descriptors or {} kwargs = kwargs or {} # Sanity check: It is possible that we got this far but some headers # were specified with an older library, which sends them as... # kwargs={'metadata': [('x-goog-api-client', 'foo/1.0 bar/3.0')]} # # Note: This is the final format we will send down to GRPC shortly. # # Remove any x-goog-api-client header that may have been present # in the metadata list. if 'metadata' in kwargs: kwargs['metadata'] = [value for value in kwargs['metadata'] if value[0].lower() != 'x-goog-api-client'] # Fill out the metrics headers with GAX and GRPC info, and convert # to a string in the format that the GRPC layer expects. kwargs.setdefault('metadata', []) kwargs['metadata'].append( ('x-goog-api-client', metrics.stringify(metrics.fill(metrics_headers))) ) try: service_config = client_config['interfaces'][service_name] except KeyError: raise KeyError('Client configuration not found for service: {}' .format(service_name)) overrides = config_override.get('interfaces', {}).get(service_name, {}) for method in service_config.get('methods'): method_config = service_config['methods'][method] overriding_method = overrides.get('methods', {}).get(method, {}) snake_name = _upper_camel_to_lower_under(method) if overriding_method and overriding_method.get('timeout_millis'): timeout = overriding_method['timeout_millis'] else: timeout = method_config['timeout_millis'] timeout /= _MILLIS_PER_SECOND bundle_descriptor = bundle_descriptors.get(snake_name) bundling_config = method_config.get('bundling', None) if overriding_method and 'bundling' in overriding_method: bundling_config = overriding_method['bundling'] bundler = _construct_bundling(bundling_config, bundle_descriptor) retry_options = _merge_retry_options( _construct_retry(method_config, service_config['retry_codes'], service_config['retry_params'], retry_names), _construct_retry(overriding_method, overrides.get('retry_codes'), overrides.get('retry_params'), retry_names)) defaults[snake_name] = gax._CallSettings( timeout=timeout, retry=retry_options, page_descriptor=page_descriptors.get(snake_name), bundler=bundler, bundle_descriptor=bundle_descriptor, kwargs=kwargs) return defaults
def _catch_errors(a_func, to_catch): """Updates a_func to wrap exceptions with GaxError Args: a_func (callable): A callable. to_catch (list[Exception]): Configures the exceptions to wrap. Returns: Callable: A function that will wrap certain exceptions with GaxError """ def inner(*args, **kwargs): """Wraps specified exceptions""" try: return a_func(*args, **kwargs) # pylint: disable=catching-non-exception except tuple(to_catch) as exception: utils.raise_with_traceback( gax.errors.create_error('RPC failed', cause=exception)) return inner def _merge_options_metadata(options, settings): """Merge metadata list (add all missing tuples)""" if not options: return options kwargs = options.kwargs if kwargs == gax.OPTION_INHERIT or 'metadata' not in kwargs: return options kwarg_meta_dict = {} merged_kwargs = options.kwargs.copy() for kwarg_meta in merged_kwargs['metadata']: kwarg_meta_dict[kwarg_meta[0].lower()] = kwarg_meta for kwarg_meta in settings.kwargs['metadata']: if kwarg_meta[0].lower() not in kwarg_meta_dict: merged_kwargs['metadata'].append(kwarg_meta) return gax.CallOptions( timeout=options.timeout, retry=options.retry, page_token=options.page_token, is_bundling=options.is_bundling, **merged_kwargs)
[docs]def create_api_call(func, settings): """Converts an rpc call into an API call governed by the settings. In typical usage, ``func`` will be a callable used to make an rpc request. This will mostly likely be a bound method from a request stub used to make an rpc call. The result is created by applying a series of function decorators defined in this module to ``func``. ``settings`` is used to determine which function decorators to apply. The result is another callable which for most values of ``settings`` has has the same signature as the original. Only when ``settings`` configures bundling does the signature change. Args: func (Callable[Sequence[object], object]): is used to make a bare rpc call. settings (_CallSettings): provides the settings for this call Returns: Callable[Sequence[object], object]: a bound method on a request stub used to make an rpc call Raises: ValueError: if ``settings`` has incompatible values, e.g, if bundling and page_streaming are both configured """ def base_caller(api_call, _, *args): """Simply call api_call and ignore settings.""" return api_call(*args) def inner(request, options=None): """Invoke with the actual settings.""" this_options = _merge_options_metadata(options, settings) this_settings = settings.merge(this_options) if this_settings.retry and this_settings.retry.retry_codes: api_call = gax.retry.retryable( func, this_settings.retry, **this_settings.kwargs) else: api_call = gax.retry.add_timeout_arg( func, this_settings.timeout, **this_settings.kwargs) api_call = _catch_errors(api_call, gax.config.API_ERRORS) return api_caller(api_call, this_settings, request) if settings.page_descriptor: if settings.bundler and settings.bundle_descriptor: raise ValueError('The API call has incompatible settings: ' 'bundling and page streaming') api_caller = _page_streamable(settings.page_descriptor) elif settings.bundler and settings.bundle_descriptor: api_caller = _bundleable(settings.bundle_descriptor) else: api_caller = base_caller return inner