aporia

def init( token: Optional[str] = None, host: Optional[str] = None, environment: Optional[str] = None, port: Optional[int] = None, verbose: Optional[bool] = None, throw_errors: Optional[bool] = None, debug: Optional[bool] = None, http_timeout_seconds: Optional[int] = None, verify_ssl: bool = True):

Initialize the Aporia SDK.

Arguments:
  • token: Authentication token.
  • host: Controller host.
  • environment: Environment in which aporia is initialized (e.g production, staging).
  • port: Controller port. Defaults to 443.
  • verbose: True to enable verbose error messages. Defaults to False.
  • throw_errors: True to cause errors to be raised as exceptions. Defaults to False.
  • debug: True to enable debug logs and stack traces in log messages. Defaults to False.
  • http_timeout_seconds: HTTP timeout in seconds. Defaults to 30.
  • verify_ssl: default to true
Notes:
  • The token, host and environment parameters are required.
  • All of the parameters here can also be defined as environment variables:
    • token -> APORIA_TOKEN
    • host -> APORIA_HOST
    • environment -> APORIA_ENVIRONMENT
    • port -> APORIA_PORT
    • verbose -> APORIA_VERBOSE
    • throw_errors -> APORIA_THROW_ERRORS
    • debug -> APORIA_DEBUG
    • http_timeout_seconds -> APORIA_HTTP_TIMEOUT_SECONDS
  • Values passed as parameters to aporia.init() override the values from the corresponding environment variables.
@safe_api_function('Creating model failed, error: {}')
def create_model( model_id: str, name: str, description: Optional[str] = None, owner: Optional[str] = None, color: Union[aporia.ModelColor, str, NoneType] = None, icon: Union[aporia.ModelIcon, str, NoneType] = None, tags: Dict[str, str] = None) -> str:

Creates a new model.

Arguments:
  • model_id: A unique identifier for the new model, which will be used in all future operations
  • name: A name for the new model, which will be displayed in Aporia's dashboard
  • description: A description of the model
  • owner: The email of the model owner
  • color: A color to distinguish the model in Aporia's dashboard. Defaults to blue
  • icon: An icon that indicates the model's designation. Defaults to general
  • tags: A mapping of tag keys to tag values
Returns:

Model ID.

Notes:
  • If this model_id already exists, NO EXCEPTION WILL BE RAISED! Instead, the same model ID will be returned.
@safe_api_function('Creating model version failed, error: {}')
def create_model_version( model_id: str, model_version: str, model_type: str, features: Dict[str, Union[str, Dict[str, Any]]], predictions: Dict[str, Union[str, Dict[str, Any]]], raw_inputs: Optional[Dict[str, Union[str, Dict[str, Any]]]] = None, metrics: Optional[Dict[str, str]] = None, model_data_type: Optional[str] = None, labels: Optional[List[str]] = None, multiclass_labels: Optional[List[str]] = None, feature_importance: Optional[Dict[str, float]] = None, mapping: Optional[Dict[str, str]] = None) -> Optional[aporia.Model]:

Creates a new model version, and defines a schema for it.

Arguments:
  • model_id: Model identifier, as received from the Aporia dashboard.
  • model_version: Model version - this can be any string that represents the model version, such as "v1" or a git commit hash.
  • model_type: Model type (also known as objective - see notes).
  • features: Schema for model features (See notes).
  • predictions: Schema for prediction results (See notes).
  • raw_inputs: Schema for raw inputs (See notes).
  • metrics: Schema for prediction metrics (See notes).
  • model_data_type: Model data type.
  • labels: Labels of multi-label, multiclass or binary model. Deprecated.
  • multiclass_labels: Labels of multi-label, multiclass or binary model. Same as "labels", Deprecated.
  • feature_importance: Features' importance.
  • mapping: General mapping (See notes).
Notes:
  • A schema is a dict, in which the keys are the fields you wish to report, and the values are the types of those fields. For example:

    {
        "feature1": "numeric",
        "feature2": "datetime"
    }
    
  • The supported model types are:

    • "regression" - for regression models
    • "binary" - for binary classification models
    • "multiclass" - for multiclass classification models
    • "multi-label" - for multi-label classification models
    • "ranking" - for ranking models
  • The valid field types (and corresponding python types) are:

    Field Type Python Types
    "numeric" float, int
    "categorical" int
    "boolean" bool
    "string" str
    "datetime" datetime.datetime, or str representing a datetime in ISO-8601 format
    "vector" list of floats
    "text" str (to be used as free text)
    "dict" dict[str, int]
  • The supported data types are:
    • "tabular"
    • "nlp"
  • The feature_importance is a mapping from feature name to it's importance (float). For example:

    {
        "feature1": 1,
        "feature2": 2
     }
     

  • The mapping allowed fields are:

    • batch_id_column_name: The name of the key in the raw_inputs dict that holds the value of the batch_id.
    • relevance_column_name: The name of the key in the predictions dict that holds the value of the relevance score of models of type ranking.
    • actual_relevance_column_name: The name of the key in the actuals dict that holds the value of the relevance score of models of type ranking.
Returns:

Model object for the new version.

def generate_features_schema_from_shape(features_shape: Tuple) -> collections.OrderedDict:

Return features schema to use in version creation from ndarray shape.

Arguments:
  • features_shape (Tuple): the shape of the features array.
Returns:

OrderedDict: The object to pass to features in the version schema.

@safe_api_function('Deleting model failed, error: {}')
def delete_model(model_id: str):

Deletes a model.

Arguments:
  • model_id: ID of the model to delete
def shutdown():

Shuts down the Aporia SDK.

Notes:
  • It is advised to call flush() before calling shutdown(), to ensure that all of the data that was sent reaches the controller.
@safe_api_function('Creating model tags failed, error: {}')
def add_model_tags(model_id: str, tags: Dict[str, str]):

Adds or updates tags to an existing model.

Each tag is a key:value pair, key and value must be strings.

Arguments:
  • model_id: Model ID
  • tags: A mapping of tag keys to tag values
Notes:
  • Each model is restricted to 10 tags
  • Tag keys are always converted to lowercase
  • If the tags parameter contains tag keys that were already defined for the model, their values will be updated.
@safe_api_function('Deleting model tag failed, error: {}')
def delete_model_tag(model_id: str, tag_key: str):

Deletes a model tag.

Arguments:
  • model_id: Model ID
  • tag_key: Tag key to delete
Notes:
  • This function is best-effort, it will not fail if the tag doesn't exist.
@safe_api_function('Fetching model tags failed, error: {}')
def get_model_tags(model_id: str) -> Optional[Dict[str, str]]:

Fetches the tag keys and values of a model.

Arguments:
  • model_id: Model ID
Returns:

A dict mapping tag keys to values

class Model(aporia.InferenceModel):

Model object for logging model events.

Model(model_id: str, model_version: str)

Initializes a model object.

Arguments:
  • model_id: Model identifier, as received from the Aporia dashboard.
  • model_version: Model version - this can be any string that represents the model version, such as "v1" or a git commit hash.
def log_training_set( self, features: Union[pandas.core.frame.DataFrame, numpy.ndarray], predictions: Optional[pandas.core.frame.DataFrame] = None, labels: Optional[pandas.core.frame.DataFrame] = None, raw_inputs: Optional[pandas.core.frame.DataFrame] = None, log_sample: bool = True, sample_size: int = 1000):

Logs aggregations of the whole set and logs a sample of the data.

Arguments:
  • features: Training set features
  • predictions: Training set predictions
  • labels: Training set labels
  • raw_inputs: Training set raw inputs.
  • log_sample: Whether to log a sample of the data.
  • sample_size: Number of records to sample.
Notes:
  • Each dataframe corresponds to a field category defined in create_model_version:
    • features -> features
    • predictions -> predictions
    • labels -> predictions
    • raw_inputs -> raw_inputs
  • Each column in the dataframe should match a field defined in create_model_version
    • Missing fields will be handled as missing values
    • Columns that do not match a defined field will be ignored
    • The column name must match the field name
  • This function is blocking and may take a while to finish running.
def log_training_sample( self, features: Union[pandas.core.frame.DataFrame, numpy.ndarray], labels: pandas.core.frame.DataFrame, raw_inputs: Optional[pandas.core.frame.DataFrame] = None, sample_size: int = 1000):

Logs a sample of the training data.

Arguments:
  • features: Training set features
  • labels: Training set labels
  • raw_inputs: Training set raw_inputs
  • sample_size: Number of records to sample
Notes:
  • Each dataframe corresponds to a field category defined in create_model_version:
    • features -> features
    • labels -> predictions
  • Each column in the dataframe should match a field defined in create_model_version
    • Missing fields will be handled as missing values
    • Columns that do not match a defined field will be ignored
    • The column name must match the field name
  • This function is blocking and may take a while to finish running.
def log_test_set( self, features: Union[pandas.core.frame.DataFrame, numpy.ndarray], predictions: pandas.core.frame.DataFrame, labels: pandas.core.frame.DataFrame, raw_inputs: Optional[pandas.core.frame.DataFrame] = None, confidences: Optional[numpy.ndarray] = None):

Logs test data.

Arguments:
  • features: Test set features
  • predictions: Test set predictions
  • labels: Test set labels
  • raw_inputs: Test set raw inputs.
  • confidences: Confidence values for the test predictions.
Notes:
  • Each dataframe corresponds to a field category defined in create_model_version:
    • features -> features
    • predictions -> predictions
    • labels -> predictions
    • raw_inputs -> raw_inputs
  • Each column in the dataframe should match a field defined in create_model_version
    • Missing fields will be handled as missing values
    • Columns that do not match a defined field will be ignored
    • The column name must match the field name
  • This function is blocking and may take a while to finish running.
def log_batch_pyspark_raw_inputs(self, ids: Any, raw_inputs: Any):

Logs raw inputs of multiple predictions.

Arguments:
  • ids: Prediction identifiers
  • raw_inputs: Raw inputs of each prediction
Notes:
  • The ids dataframe must contain exactly one column
  • The ids and raw_inputs dataframes must have the same number of rows
def log_batch_pyspark_actuals(self, ids: Any, actuals: Any):

Logs actual values of multiple predictions.

Arguments:
  • ids: Prediction identifiers
  • actuals: Actual prediction results of each prediction
Notes:
  • The ids dataframe must contain exactly one column
  • The ids and actuals dataframes must have the same number of rows
def log_batch_pyspark_prediction( self, data: Any, id_column: Optional[str] = None, timestamp_column: Optional[str] = None, features: Optional[Mapping[str, str]] = None, predictions: Optional[Mapping[str, str]] = None, raw_inputs: Optional[Mapping[str, str]] = None, labels: Optional[Mapping[str, str]] = None, spark_options: Optional[Mapping[str, str]] = None):

Logs multiple predictions.

Arguments:
  • data: PsSpark dataframe
  • id_column: Optional dataframe column to use as an id.
  • timestamp_column: Optional dataframe column to use as an id.
  • features: A mapping of feature names (from the schema) to dataframe columns
  • predictions: A mapping of predictions names (from the schema) to dataframe columns
  • raw_inputs: A mapping of raw input names (from the schema) to dataframe columns
  • labels: A mapping of label names (from the schema) to dataframe columns
  • spark_options: Optional configuration extension for spark elastic connector. See https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html
def log_streaming_pyspark_prediction( self, data: Any, id_column: Optional[str] = None, timestamp_column: Optional[str] = None, features: Optional[Mapping[str, str]] = None, predictions: Optional[Mapping[str, str]] = None, raw_inputs: Optional[Mapping[str, str]] = None, labels: Optional[Mapping[str, str]] = None, spark_options: Optional[Mapping[str, str]] = None):

Logs stream of predictions.

Arguments:
  • data: PsSpark dataframe
  • id_column: Optional dataframe column to use as an id.
  • timestamp_column: Optional dataframe column to use as an id.
  • features: A mapping of feature names (from the schema) to dataframe columns
  • predictions: A mapping of predictions names (from the schema) to dataframe columns
  • raw_inputs: A mapping of raw input names (from the schema) to dataframe columns
  • labels: A mapping of label names (from the schema) to dataframe columns
  • spark_options: Optional configuration extension for spark elastic connector. See https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html
def log_pyspark_training_set( self, data: Any, features: Optional[Mapping[str, str]] = None, predictions: Optional[Mapping[str, str]] = None, raw_inputs: Optional[Mapping[str, str]] = None):

Logs training data from PySpark DataFrames.

Arguments:
  • data: PySparkDataFrame all the data
  • features: Optional[Mapping[str,str]] mapping of column to features as configured in the schema. Key is the name in the schema and the value is the name of the corresponding dataframe column
  • predictions: Optional[Mapping[str,str]] mapping of column to predictions as configured in the schema. Key is the name in the schema and the value is the name of the corresponding dataframe column
  • raw_inputs: Optional[Mapping[str, str]] mapping of column to raw inputs as configured in the schema. Key is the name in the schema and the value is the name of the corresponding dataframe column
def log_pyspark_test_set( self, features: Any, predictions: Any, labels: Any, raw_inputs: Optional[Any] = None):

Logs test data from PySpark DataFrames.

Arguments:
  • features: Test set features
  • predictions: Test set predictions
  • labels: Test set labels
  • raw_inputs: Test set raw inputs.
Notes:
  • Each dataframe corresponds to a field category defined in create_model_version:
    • features -> features
    • predictions -> predictions
    • labels -> predictions
    • raw_inputs -> raw_inputs
  • Each column in the dataframe should match a field defined in create_model_version
    • Missing fields will be handled as missing values
    • Columns that do not match a defined field will be ignored
    • The column name must match the field name
  • This function is blocking and may take a while to finish running.
class InferenceModel(aporia.core.base_model.BaseModel):

Model object for logging inference events.

InferenceModel(model_id: str, model_version: str)

Initializes an inference model object.

Arguments:
  • model_id: Model identifier, as received from the Aporia dashboard.
  • model_version: Model version - this can be any string that represents the model version, such as "v1" or a git commit hash.
@validate_model_ready
def log_prediction( self, features: Union[Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]], numpy.ndarray], predictions: Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]], id: Optional[str] = None, metrics: Optional[Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]]] = None, occurred_at: Optional[datetime.datetime] = None, confidence: Union[float, List[float], NoneType] = None, raw_inputs: Optional[Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]]] = None, actuals: Optional[Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]]] = None):

Logs a single prediction.

Arguments:
  • id: Prediction identifier.
  • features: Values for all the features in the prediction
  • predictions: Prediction result
  • metrics: Prediction metrics.
  • occurred_at: Prediction timestamp.
  • confidence: Prediction confidence.
  • raw_inputs: Raw inputs of the prediction.
  • actuals: Actual prediction results.
Note:
  • If occurred_at is None, it will be reported as datetime.now()
@validate_model_ready
def log_batch_prediction(self, batch_predictions: Iterable[dict]):

Logs multiple predictions.

Arguments:
  • batch_predictions: An iterable that produces prediction dicts.

    • Each prediction dict MUST contain the following keys:

      • features (Dict[str, FieldValue]): Values for all the features in the prediction
      • predictions (Dict[str, FieldValue]): Prediction result
    • Each prediction dict MAY also contain the following keys:

      • id (str): Prediction identifier.
      • occurred_at (datetime): Prediction timestamp.
      • metrics (Dict[str, FieldValue]): Prediction metrics
      • confidence (Union[float, List[float]]): Prediction confidence.
      • raw_inputs (Dict[str, FieldValue]): Raw inputs of the prediction.
      • actuals (Dict[str, FieldValue]) Actual prediction results.
Notes:
  • If occurred_at is None in any of the predictions, it will be reported as datetime.now()
@validate_model_ready
def log_raw_inputs( self, id: str, raw_inputs: Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]]):

Logs raw inputs of a single prediction.

Arguments:
  • id: Prediction identifier.
  • raw_inputs: Raw inputs of the prediction.
@validate_model_ready
def log_batch_raw_inputs(self, batch_raw_inputs: Iterable[dict]):

Logs raw inputs of multiple predictions.

Arguments:
  • batch_raw_inputs: An iterable that produces raw_inputs dicts.

    • Each dict MUST contain the following keys:
      • id (str): Prediction identifier.
      • raw_inputs (Dict[str, FieldValue]): Raw inputs of the prediction.
@validate_model_ready
def log_actuals( self, id: str, actuals: Dict[str, Union[float, int, str, bool, datetime.datetime, List[int]]]):

Logs actual values of a single prediction.

Arguments:
  • id: Prediction identifier.
  • actuals: Actual prediction results.
Note:
  • The fields reported in actuals must be a subset of the fields reported in predictions.
@validate_model_ready
def log_batch_actuals(self, batch_actuals: Iterable[dict]):

Logs actual values of multiple predictions.

Arguments:
  • batch_actuals: An iterable that produces actuals dicts.

    • Each dict MUST contain the following keys:
      • id (str): Prediction identifier.
      • actuals (Dict[str, FieldValue]): Actual prediction results.
Note:
  • The fields reported in actuals must be a subset of the fields reported in predictions.
@validate_model_ready
def log_json(self, data: Any):

Logs arbitrary data.

Arguments:
  • data: Data to log, must be JSON serializable
@validate_model_ready
def upload_model_artifact( self, model_artifact: Any, artifact_type: Union[str, aporia.inference.types.model_artifact.ModelArtifactType]):

Uploads binary model artifact.

Arguments:
  • model_artifact: Binary model artifact.
  • artifact_type: The type of model artifact (see below)
Model Artifact Types:
  • onnx
  • h5
@validate_model_ready
def set_feature_importance(self, feature_importance: Dict[str, float]):

Update the features' importance of the model.

Arguments:
  • feature_importance: feature name to importance mapping
@validate_model_ready
def log_index_to_word_mapping(self, index_to_word_mapping: Dict[Any, Any]):

Logs index to word mapping.

Arguments:
  • index_to_word_mapping: A mapping between a numeric index to a word.
@validate_model_ready
def connect_serving( self, data_source: aporia.core.types.data_source.DataSource, id_column: str, timestamp_column: str, predictions: Optional[Dict[str, str]] = None, features: Optional[Dict[str, str]] = None, labels: Optional[Dict[str, str]] = None, raw_inputs: Optional[Dict[str, str]] = None, http_timeout_seconds: Optional[int] = None):

Connect to external serving data set.

Arguments:
  • data_source: The data source to fetch the data set from.
  • id_column: The name of the id column.
  • timestamp_column: The name of the timestamp column.
  • features: Mapping from feature name to column name.
  • predictions: Mapping from prediction name to column name.
  • labels: Mapping from actual name to column name.
  • raw_inputs: Mapping from raw input name to column name.
  • http_timeout_seconds: HTTP timeout in seconds. Defaults to 10 minutes.
@validate_model_ready
def connect_actuals( self, data_source: aporia.core.types.data_source.DataSource, id_column: str, timestamp_column: str, labels: Optional[Dict[str, str]] = None, http_timeout_seconds: Optional[int] = None):

Connect to external actual data set.

Arguments:
  • data_source: The data source to fetch the data set from.
  • id_column: The name of the id column.
  • timestamp_column: The name of a column contains the times actual was updated.
  • labels: Mapping from the predictions name to column holding the actual value.
  • http_timeout_seconds: HTTP timeout in seconds. Defaults to 10 minutes.
@validate_model_ready
def connect_training( self, data_source: aporia.core.types.data_source.DataSource, id_column: str, timestamp_column: str, predictions: Optional[Dict[str, str]] = None, features: Optional[Dict[str, str]] = None, labels: Optional[Dict[str, str]] = None, raw_inputs: Optional[Dict[str, str]] = None, http_timeout_seconds: Optional[int] = None):

Connect to external training data set.

Arguments:
  • data_source: The data source to fetch the data set from.
  • id_column: The name of the id column.
  • timestamp_column: The name of the timestamp column.
  • features: Mapping from feature name to column name.
  • predictions: Mapping from prediction name to column name.
  • labels: Mapping from prediction name to column name.
  • raw_inputs: Mapping from raw input name to column name.
  • http_timeout_seconds: HTTP timeout in seconds. Defaults to 10 minutes.
@validate_model_ready
def connect_testing( self, data_source: aporia.core.types.data_source.DataSource, id_column: str, timestamp_column: str, predictions: Optional[Dict[str, str]] = None, features: Optional[Dict[str, str]] = None, labels: Optional[Dict[str, str]] = None, raw_inputs: Optional[Dict[str, str]] = None, http_timeout_seconds: Optional[int] = None):

Connect to external test data set.

Arguments:
  • data_source: The data source to fetch the data set from.
  • id_column: The name of the id column.
  • timestamp_column: The name of the timestamp column.
  • features: Mapping from feature name to column name.
  • predictions: Mapping from prediction name to column name.
  • labels: Mapping from prediction name to column name.
  • raw_inputs: Mapping from raw input name to column name.
  • http_timeout_seconds: HTTP timeout in seconds. Defaults to 10 minutes.
@validate_model_ready
def flush(self, timeout: Optional[int] = None) -> Optional[int]:

Waits for all currently scheduled tasks to finish.

Arguments:
  • timeout: Maximum number of seconds to wait for tasks to complete. Default to None (No timeout).
Returns:

Number of tasks that haven't finished running.

Inherited Members
aporia.core.base_model.BaseModel
handle_error
class ModelColor(enum.Enum):

Model colors.

BLUE = <ModelColor.BLUE: 'blue'>
ARCTIC_BLUE = <ModelColor.ARCTIC_BLUE: 'arctic_blue'>
GREEN = <ModelColor.GREEN: 'green'>
TURQUOISE = <ModelColor.TURQUOISE: 'turquoise'>
PINK = <ModelColor.PINK: 'pink'>
PURPLE = <ModelColor.PURPLE: 'purple'>
YELLOW = <ModelColor.YELLOW: 'yellow'>
RED = <ModelColor.RED: 'red'>
Inherited Members
enum.Enum
name
value
class ModelIcon(enum.Enum):

Model Icons.

GENERAL = <ModelIcon.GENERAL: 'general'>
CHURN_AND_RETENTION = <ModelIcon.CHURN_AND_RETENTION: 'churn-and-retention'>
CONVERSION_PREDICT = <ModelIcon.CONVERSION_PREDICT: 'conversion-predict'>
ANOMALY = <ModelIcon.ANOMALY: 'anomaly'>
DYNAMIC_PRICING = <ModelIcon.DYNAMIC_PRICING: 'dynamic-pricing'>
EMAIL_FILTERING = <ModelIcon.EMAIL_FILTERING: 'email-filtering'>
DEMAND_FORECASTING = <ModelIcon.DEMAND_FORECASTING: 'demand-forecasting'>
LTV = <ModelIcon.LTV: 'ltv'>
PERSONALIZATION = <ModelIcon.PERSONALIZATION: 'personalization'>
FRAUD_DETECTION = <ModelIcon.FRAUD_DETECTION: 'fraud-detection'>
CREDIT_RISK = <ModelIcon.CREDIT_RISK: 'credit-risk'>
RECOMMENDATIONS = <ModelIcon.RECOMMENDATIONS: 'recommendations'>
Inherited Members
enum.Enum
name
value
class AthenaDataSource(aporia.JDBCDataSource):

AWS Athena data source.

AthenaDataSource( url: str, query: str, s3_output_location: str, user: Optional[str] = None, password: Optional[str] = None, sample_size: float = 1, select_expr: Optional[List[str]] = None, read_options: Optional[Dict[str, Any]] = None)

Initializes a JDBCDataSource.

Arguments:
  • url: Database connection URL
  • query: SQL query to read data from the database
  • s3_output_location: Path to S3 bucket for storing query results
  • user: Database user
  • password: Database password
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
  • read_options: Additional spark read options
type: aporia.core.types.data_source.DataSourceType

See base class.

def serialize_config(self) -> Dict[str, Any]:

See base class.

Inherited Members
aporia.core.types.data_source.DataSource
serialize
class BigQueryDataSource(aporia.core.types.data_source.SparkDataSource):

BigQuery data source.

BigQueryDataSource( credentials_base64: str, table: str, dataset: Optional[str] = None, project: Optional[str] = None, parent_project: Optional[str] = None, sample_size: float = 1.0, select_expr: Optional[List[str]] = None, read_options: Optional[Dict[str, Any]] = None)

Initializes a SparkDataSource.

Arguments:
  • credentials_base64: Base64 encoded JSON string containing GCP service account details.
  • table: Table to query
  • dataset: Dataset to query
  • project: Project name
  • parent_project: Parent project name
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
  • read_options: Additional spark read options
type: aporia.core.types.data_source.DataSourceType

See base class.

def serialize_config(self) -> Dict[str, Any]:

See base class.

Inherited Members
aporia.core.types.data_source.DataSource
serialize
class JDBCDataSource(aporia.core.types.data_source.SparkDataSource):

Generic JDBC data source.

JDBCDataSource( url: str, query: str, user: Optional[str] = None, password: Optional[str] = None, sample_size: float = 1, select_expr: Optional[List[str]] = None, read_options: Optional[Dict[str, Any]] = None)

Initializes a JDBCDataSource.

Arguments:
  • url: Database connection URL
  • query: SQL query to read data from the database
  • user: Database user
  • password: Database password
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
  • read_options: Additional spark read options
type: aporia.core.types.data_source.DataSourceType

See base class.

def serialize_config(self) -> Dict[str, Any]:

See base class.

Inherited Members
aporia.core.types.data_source.DataSource
serialize
class PostgresJDBCDataSource(aporia.JDBCDataSource):

Postgres (via JDBC) data source.

type: aporia.core.types.data_source.DataSourceType

See base class.

Inherited Members
JDBCDataSource
JDBCDataSource
serialize_config
aporia.core.types.data_source.DataSource
serialize
class S3DataSource(aporia.core.types.data_source.SparkDataSource):

S3 data source.

S3DataSource( object_path: str, object_format: Union[aporia.core.types.data_source.S3FileFormat, str], sample_size: float = 1.0, select_expr: Optional[List[str]] = None, read_options: Optional[Dict[str, Any]] = None)

Initializes a S3DataSource.

Arguments:
  • object_path: The path in S3 to the object, excluding s3 prefix (e.g: bucket-name/file.parquet)
  • object_format: Type of the input file (Parquet, CSV, JSON, etc)
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
  • read_options: Additional spark read options
type: aporia.core.types.data_source.DataSourceType

See base class.

def serialize_config(self) -> Dict[str, Any]:

See base class.

Inherited Members
aporia.core.types.data_source.DataSource
serialize
S3FileFormat
class SnowflakeDataSource(aporia.core.types.data_source.SparkDataSource):

Snowflake data source.

SnowflakeDataSource( url: str, query: str, user: str, password: str, database: str, schema: str, warehouse: Optional[str] = None, sample_size: float = 1.0, select_expr: Optional[List[str]] = None, read_options: Optional[Dict[str, Any]] = None)

Initializes a SnowflakeDataSource.

Arguments:
  • url: The full Snowflake URL of your instance
  • query: SQL query to read data from the database
  • user: Username for database connection
  • password: Password for database connection
  • database: Database name
  • schema: Schema name
  • warehouse: The default virtual warehouse to use
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
  • read_options: Additional spark read options
type: aporia.core.types.data_source.DataSourceType

See base class.

def serialize_config(self) -> Dict[str, Any]:

See base class.

Inherited Members
aporia.core.types.data_source.DataSource
serialize
class GlueDataSource(aporia.core.types.data_source.HiveDataSource):

Glue data source.

GlueDataSource( query: str, sample_size: float = 1.0, select_expr: Optional[List[str]] = None)

Initializes a GlueDataSource.

Arguments:
  • query: SQL query to read data from the database
  • sample_size: Fraction of data to sample
  • select_expr: Select expressions to apply to the dataframe after reading
type: aporia.core.types.data_source.DataSourceType

See base class.

Inherited Members
aporia.core.types.data_source.HiveDataSource
serialize_config
aporia.core.types.data_source.DataSource
serialize