Source code for sparc.client.services.metadata
import json
import logging
from configparser import SectionProxy
from typing import List, Optional, Union
import requests
from requests.adapters import HTTPAdapter, Retry
from ._default import ServiceBase
[docs]
class MetadataService(ServiceBase):
"""A wrapper for the Elasticsearch Metadata library
Parameters:
-----------
config : dict
A configuration containing necessary API key (scicrunch_api_key).
connect : bool
Not needed with REST metadata services.
Attributes:
-----------
default_headers : dict
A dictionary with headers to make HTTP requests.
host_api : str
A default HTTP address of the SciCrunch Elasticsearch API endpoint.
Methods:
--------
get_profile() -> str
Returns the currently used API Key.
set_profile() -> str
Changes the API Key.
close() : None
Not needed with REST metadata services.
getURL(...) : dict
Supporting function to retrieve data from REST endpoint via GET
This support Elasticsearch URL based queries
postURL(...) : dict
Supporting function to retrieve data from REST endpoint
This supports Elasticsearch JSON queries
list_datasets(...) : dict
Returns a dictionary with datasets metadata.
search_datasets(...) : dict
Returns a dictionary with datasets matching search criteria.
"""
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json; charset=utf-8",
}
scicrunch_api_key: str = None
profile_name: str = None
def __init__(
self, config: Optional[Union[dict, SectionProxy]] = None, connect: bool = False
) -> None:
logging.info("Initializing SPARC K-Core Elasticsearch services...")
logging.debug(str(config))
self.host_api = "https://api.scicrunch.io/elastic/v1"
self.algolia_api = "https://api.scicrunch.io/elastic/v1/SPARC_Algolia_pr/_search"
if config is not None:
self.scicrunch_api_key = config.get("scicrunch_api_key")
logging.info("SciCrunch API Key: Found")
self.profile_name = config.get("pennsieve_profile_name")
logging.info("Profile: " + self.profile_name)
if self.scicrunch_api_key == None:
logging.error("SciCrunch API Key: Not Found")
[docs]
def connect(self) -> str:
"""Not needed as metadata services are REST service calls"""
logging.info("Metadata REST services available...")
return self.host_api
[docs]
def info(self) -> str:
"""Returns information about the metadata search services."""
return self.host_api
[docs]
def get_profile(self) -> str:
"""Returns currently used API key.
Returns:
--------
A string with API Key.
"""
return self.scicrunch_api_key
[docs]
def set_profile(self, api_key: str) -> str:
"""Changes the API key to the specified name.
Parameters:
-----------
api_key : str
The API key to use.
Returns:
--------
A string with confirmation of API key switch.
"""
self.scicrunch_api_key = api_key
return self.scicrunch_api_key
[docs]
def close(self) -> None:
"""Not needed as metadata services are REST service calls"""
return self.host_api
#####################################################################
# Supporting Functions
#####################################################################
# Function to GET content from URL with retries
[docs]
def getURL(self, url, headers="NONE"):
result = {}
with requests.Session() as url_session:
retries = Retry(
total=6,
backoff_factor=1,
status_forcelist=[404, 413, 429, 500, 502, 503, 504],
)
url_session.mount("https://", HTTPAdapter(max_retries=retries))
if headers == "NONE":
url_result = url_session.get(url)
else:
url_result = url_session.get(url, headers=headers)
logging.info("HTTP " + str(url_result.status_code) + ":" + url)
return url_result.json()
#####################################################################
# Function to retrieve content via POST from URL with retries
[docs]
def postURL(self, url, body, headers="NONE"):
result = {}
with requests.Session() as url_session:
retries = Retry(
total=6,
backoff_factor=1,
status_forcelist=[404, 413, 429, 500, 502, 503, 504],
)
url_session.mount("https://", HTTPAdapter(max_retries=retries))
if type(body) is dict:
body_json = body
elif type(body) is str:
body_json = json.loads(body)
else:
result["status"] = 400
result["message"] = "Bad JSON body - not a proper query string"
return result
if headers == "NONE":
url_result = url_session.post(url, json=body_json)
else:
url_result = url_session.post(url, json=body_json, headers=headers)
logging.info("HTTP " + str(url_result.status_code) + ":" + url)
return url_result.json()
#####################################################################
# Metadata Search Functions
[docs]
def list_datasets(self, limit: int = 10, offset: int = 0) -> list:
"""Lists datasets and associated metadata.
Parameters:
-----------
limit : int
Max number of datasets returned.
offset : int
Offset used for pagination of results.
Returns:
--------
A json with the results.
"""
request_headers = self.default_headers
if "api.scicrunch.io" not in self.algolia_api:
# If user changes URL don't add ES specific information
list_url = self.algolia_api
else:
list_url = self.algolia_api + "?" + "from=" + str(offset) + "&size=" + str(limit)
request_headers["apikey"] = self.scicrunch_api_key
list_results = self.getURL(list_url, headers=request_headers)
return list_results
[docs]
def search_datasets(self, query: str = '{"query": { "match_all": {}}}') -> list:
"""Gets datasets matching specified query.
This function provides
Parameters:
-----------
query : str
Elasticsearch JSON query.
Returns:
--------
A json with the results.
"""
request_headers = self.default_headers
if "api.scicrunch.io" in self.algolia_api:
# If user hasn't changed URL add ES specific information
request_headers["apikey"] = self.scicrunch_api_key
search_results = self.postURL(self.algolia_api, body=query, headers=request_headers)
return search_results