374 lines
10 KiB
Python
374 lines
10 KiB
Python
import array
|
|
import numbers
|
|
from collections.abc import Mapping, Sequence
|
|
from typing import Any, Iterable
|
|
|
|
from .const import INT_MAX_VALUE, INT_MIN_VALUE, LONG_MAX_VALUE, LONG_MIN_VALUE
|
|
from ._validate_common import ValidationError, ValidationErrorData
|
|
from .schema import extract_record_type, extract_logical_type, schema_name, parse_schema
|
|
from .logical_writers import LOGICAL_WRITERS
|
|
from ._schema_common import UnknownType
|
|
from .types import Schema, NamedSchemas
|
|
|
|
NoValue = object()
|
|
|
|
|
|
def _validate_null(datum, **kwargs):
|
|
"""Checks that the data value is None."""
|
|
return datum is None
|
|
|
|
|
|
def _validate_boolean(datum, **kwargs):
|
|
"""Check that the data value is bool instance"""
|
|
return isinstance(datum, bool)
|
|
|
|
|
|
def _validate_string(datum, **kwargs):
|
|
"""Check that the data value is string"""
|
|
return isinstance(datum, str)
|
|
|
|
|
|
def _validate_bytes(datum, **kwargs):
|
|
"""Check that the data value is python bytes type"""
|
|
return isinstance(datum, (bytes, bytearray))
|
|
|
|
|
|
def _validate_int(datum, **kwargs):
|
|
"""
|
|
Check that the data value is a non floating
|
|
point number with size less that Int32.
|
|
|
|
Int32 = -2147483648<=datum<=2147483647
|
|
|
|
conditional python types: int, numbers.Integral
|
|
"""
|
|
return (
|
|
isinstance(datum, (int, numbers.Integral))
|
|
and INT_MIN_VALUE <= datum <= INT_MAX_VALUE
|
|
and not isinstance(datum, bool)
|
|
)
|
|
|
|
|
|
def _validate_long(datum, **kwargs):
|
|
"""
|
|
Check that the data value is a non floating
|
|
point number with size less that long64.
|
|
|
|
Int64 = -9223372036854775808 <= datum <= 9223372036854775807
|
|
|
|
conditional python types: int, numbers.Integral
|
|
"""
|
|
return (
|
|
isinstance(datum, (int, numbers.Integral))
|
|
and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE
|
|
and not isinstance(datum, bool)
|
|
)
|
|
|
|
|
|
def _validate_float(datum, **kwargs):
|
|
"""
|
|
Check that the data value is a floating
|
|
point number or double precision.
|
|
|
|
conditional python types
|
|
(int, float, numbers.Real)
|
|
"""
|
|
return isinstance(datum, (int, float, numbers.Real)) and not isinstance(datum, bool)
|
|
|
|
|
|
def _validate_fixed(datum, schema, **kwargs):
|
|
"""
|
|
Check that the data value is fixed width bytes,
|
|
matching the schema['size'] exactly!
|
|
"""
|
|
return isinstance(datum, bytes) and len(datum) == schema["size"]
|
|
|
|
|
|
def _validate_enum(datum, schema, **kwargs):
|
|
"""Check that the data value matches one of the enum symbols."""
|
|
return datum in schema["symbols"]
|
|
|
|
|
|
def _validate_array(datum, schema, named_schemas, parent_ns, raise_errors, options):
|
|
"""Check that the data list values all match schema['items']."""
|
|
return (
|
|
isinstance(datum, (Sequence, array.array))
|
|
and not isinstance(datum, str)
|
|
and all(
|
|
_validate(
|
|
datum=d,
|
|
schema=schema["items"],
|
|
named_schemas=named_schemas,
|
|
field=parent_ns,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
for d in datum
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_map(datum, schema, named_schemas, parent_ns, raise_errors, options):
|
|
"""
|
|
Check that the data is a Map(k,v)
|
|
matching values to schema['values'] type.
|
|
"""
|
|
return (
|
|
isinstance(datum, Mapping)
|
|
and all(isinstance(k, str) for k in datum)
|
|
and all(
|
|
_validate(
|
|
datum=v,
|
|
schema=schema["values"],
|
|
named_schemas=named_schemas,
|
|
field=parent_ns,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
for v in datum.values()
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_record(datum, schema, named_schemas, parent_ns, raise_errors, options):
|
|
"""
|
|
Check that the data is a Mapping type with all schema defined fields
|
|
validated as True.
|
|
"""
|
|
_, fullname = schema_name(schema, parent_ns)
|
|
return (
|
|
isinstance(datum, Mapping)
|
|
and not ("-type" in datum and datum["-type"] != fullname)
|
|
and all(
|
|
_validate(
|
|
datum=datum.get(f["name"], f.get("default", NoValue)),
|
|
schema=f["type"],
|
|
named_schemas=named_schemas,
|
|
field=f"{fullname}.{f['name']}",
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
for f in schema["fields"]
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_union(datum, schema, named_schemas, parent_ns, raise_errors, options):
|
|
"""
|
|
Check that the data is a list type with possible options to
|
|
validate as True.
|
|
"""
|
|
if isinstance(datum, tuple) and not options.get("disable_tuple_notation"):
|
|
(name, datum) = datum
|
|
for candidate in schema:
|
|
if extract_record_type(candidate) == "record":
|
|
schema_name = candidate["name"]
|
|
else:
|
|
schema_name = candidate
|
|
if schema_name == name:
|
|
return _validate(
|
|
datum,
|
|
schema=candidate,
|
|
named_schemas=named_schemas,
|
|
field=parent_ns,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
else:
|
|
return False
|
|
|
|
errors = []
|
|
for s in schema:
|
|
try:
|
|
ret = _validate(
|
|
datum,
|
|
schema=s,
|
|
named_schemas=named_schemas,
|
|
field=parent_ns,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
if ret:
|
|
# We exit on the first passing type in Unions
|
|
return True
|
|
except ValidationError as e:
|
|
errors.extend(e.errors)
|
|
if raise_errors:
|
|
raise ValidationError(*errors)
|
|
return False
|
|
|
|
|
|
VALIDATORS = {
|
|
"null": _validate_null,
|
|
"boolean": _validate_boolean,
|
|
"string": _validate_string,
|
|
"int": _validate_int,
|
|
"long": _validate_long,
|
|
"float": _validate_float,
|
|
"double": _validate_float,
|
|
"bytes": _validate_bytes,
|
|
"fixed": _validate_fixed,
|
|
"enum": _validate_enum,
|
|
"array": _validate_array,
|
|
"map": _validate_map,
|
|
"union": _validate_union,
|
|
"error_union": _validate_union,
|
|
"record": _validate_record,
|
|
"error": _validate_record,
|
|
"request": _validate_record,
|
|
}
|
|
|
|
|
|
def _validate(datum, schema, named_schemas, field, raise_errors, options):
|
|
# This function expects the schema to already be parsed
|
|
record_type = extract_record_type(schema)
|
|
result = None
|
|
|
|
if datum is NoValue and options.get("strict"):
|
|
result = False
|
|
else:
|
|
if datum is NoValue:
|
|
datum = None
|
|
|
|
logical_type = extract_logical_type(schema)
|
|
if logical_type:
|
|
prepare = LOGICAL_WRITERS.get(logical_type)
|
|
if prepare:
|
|
datum = prepare(datum, schema)
|
|
|
|
validator = VALIDATORS.get(record_type)
|
|
if validator:
|
|
result = validator(
|
|
datum,
|
|
schema=schema,
|
|
named_schemas=named_schemas,
|
|
parent_ns=field,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
elif record_type in named_schemas:
|
|
result = _validate(
|
|
datum,
|
|
schema=named_schemas[record_type],
|
|
named_schemas=named_schemas,
|
|
field=field,
|
|
raise_errors=raise_errors,
|
|
options=options,
|
|
)
|
|
else:
|
|
raise UnknownType(record_type)
|
|
|
|
if raise_errors and result is False:
|
|
raise ValidationError(ValidationErrorData(datum, schema, field))
|
|
|
|
return result
|
|
|
|
|
|
def validate(
|
|
datum: Any,
|
|
schema: Schema,
|
|
field: str = "",
|
|
raise_errors: bool = True,
|
|
strict: bool = False,
|
|
disable_tuple_notation: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Determine if a python datum is an instance of a schema.
|
|
|
|
Parameters
|
|
----------
|
|
datum
|
|
Data being validated
|
|
schema
|
|
Schema
|
|
field
|
|
Record field being validated
|
|
raise_errors
|
|
If true, errors are raised for invalid data. If false, a simple
|
|
True (valid) or False (invalid) result is returned
|
|
strict
|
|
If true, fields without values will raise errors rather than implicitly
|
|
defaulting to None
|
|
disable_tuple_notation
|
|
If set to True, tuples will not be treated as a special case. Therefore,
|
|
using a tuple to indicate the type of a record will not work
|
|
|
|
|
|
Example::
|
|
|
|
from fastavro.validation import validate
|
|
schema = {...}
|
|
record = {...}
|
|
validate(record, schema)
|
|
"""
|
|
named_schemas: NamedSchemas = {}
|
|
parsed_schema = parse_schema(schema, named_schemas)
|
|
return _validate(
|
|
datum,
|
|
parsed_schema,
|
|
named_schemas,
|
|
field,
|
|
raise_errors,
|
|
options={"strict": strict, "disable_tuple_notation": disable_tuple_notation},
|
|
)
|
|
|
|
|
|
def validate_many(
|
|
records: Iterable[Any],
|
|
schema: Schema,
|
|
raise_errors: bool = True,
|
|
strict: bool = False,
|
|
disable_tuple_notation: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Validate a list of data!
|
|
|
|
Parameters
|
|
----------
|
|
records
|
|
List of records to validate
|
|
schema
|
|
Schema
|
|
raise_errors
|
|
If true, errors are raised for invalid data. If false, a simple
|
|
True (valid) or False (invalid) result is returned
|
|
strict
|
|
If true, fields without values will raise errors rather than implicitly
|
|
defaulting to None
|
|
disable_tuple_notation
|
|
If set to True, tuples will not be treated as a special case. Therefore,
|
|
using a tuple to indicate the type of a record will not work
|
|
|
|
|
|
Example::
|
|
|
|
from fastavro.validation import validate_many
|
|
schema = {...}
|
|
records = [{...}, {...}, ...]
|
|
validate_many(records, schema)
|
|
"""
|
|
named_schemas: NamedSchemas = {}
|
|
parsed_schema = parse_schema(schema, named_schemas)
|
|
errors = []
|
|
results = []
|
|
for record in records:
|
|
try:
|
|
results.append(
|
|
_validate(
|
|
record,
|
|
parsed_schema,
|
|
named_schemas,
|
|
field="",
|
|
raise_errors=raise_errors,
|
|
options={
|
|
"strict": strict,
|
|
"disable_tuple_notation": disable_tuple_notation,
|
|
},
|
|
)
|
|
)
|
|
except ValidationError as e:
|
|
errors.extend(e.errors)
|
|
if raise_errors and errors:
|
|
raise ValidationError(*errors)
|
|
return all(results)
|