2025-05-16 18:00:22 +04:00

374 lines
10 KiB
Python

import array
import numbers
from collections.abc import Mapping, Sequence
from typing import Any, Iterable
from .const import INT_MAX_VALUE, INT_MIN_VALUE, LONG_MAX_VALUE, LONG_MIN_VALUE
from ._validate_common import ValidationError, ValidationErrorData
from .schema import extract_record_type, extract_logical_type, schema_name, parse_schema
from .logical_writers import LOGICAL_WRITERS
from ._schema_common import UnknownType
from .types import Schema, NamedSchemas
NoValue = object()
def _validate_null(datum, **kwargs):
"""Checks that the data value is None."""
return datum is None
def _validate_boolean(datum, **kwargs):
"""Check that the data value is bool instance"""
return isinstance(datum, bool)
def _validate_string(datum, **kwargs):
"""Check that the data value is string"""
return isinstance(datum, str)
def _validate_bytes(datum, **kwargs):
"""Check that the data value is python bytes type"""
return isinstance(datum, (bytes, bytearray))
def _validate_int(datum, **kwargs):
"""
Check that the data value is a non floating
point number with size less that Int32.
Int32 = -2147483648<=datum<=2147483647
conditional python types: int, numbers.Integral
"""
return (
isinstance(datum, (int, numbers.Integral))
and INT_MIN_VALUE <= datum <= INT_MAX_VALUE
and not isinstance(datum, bool)
)
def _validate_long(datum, **kwargs):
"""
Check that the data value is a non floating
point number with size less that long64.
Int64 = -9223372036854775808 <= datum <= 9223372036854775807
conditional python types: int, numbers.Integral
"""
return (
isinstance(datum, (int, numbers.Integral))
and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE
and not isinstance(datum, bool)
)
def _validate_float(datum, **kwargs):
"""
Check that the data value is a floating
point number or double precision.
conditional python types
(int, float, numbers.Real)
"""
return isinstance(datum, (int, float, numbers.Real)) and not isinstance(datum, bool)
def _validate_fixed(datum, schema, **kwargs):
"""
Check that the data value is fixed width bytes,
matching the schema['size'] exactly!
"""
return isinstance(datum, bytes) and len(datum) == schema["size"]
def _validate_enum(datum, schema, **kwargs):
"""Check that the data value matches one of the enum symbols."""
return datum in schema["symbols"]
def _validate_array(datum, schema, named_schemas, parent_ns, raise_errors, options):
"""Check that the data list values all match schema['items']."""
return (
isinstance(datum, (Sequence, array.array))
and not isinstance(datum, str)
and all(
_validate(
datum=d,
schema=schema["items"],
named_schemas=named_schemas,
field=parent_ns,
raise_errors=raise_errors,
options=options,
)
for d in datum
)
)
def _validate_map(datum, schema, named_schemas, parent_ns, raise_errors, options):
"""
Check that the data is a Map(k,v)
matching values to schema['values'] type.
"""
return (
isinstance(datum, Mapping)
and all(isinstance(k, str) for k in datum)
and all(
_validate(
datum=v,
schema=schema["values"],
named_schemas=named_schemas,
field=parent_ns,
raise_errors=raise_errors,
options=options,
)
for v in datum.values()
)
)
def _validate_record(datum, schema, named_schemas, parent_ns, raise_errors, options):
"""
Check that the data is a Mapping type with all schema defined fields
validated as True.
"""
_, fullname = schema_name(schema, parent_ns)
return (
isinstance(datum, Mapping)
and not ("-type" in datum and datum["-type"] != fullname)
and all(
_validate(
datum=datum.get(f["name"], f.get("default", NoValue)),
schema=f["type"],
named_schemas=named_schemas,
field=f"{fullname}.{f['name']}",
raise_errors=raise_errors,
options=options,
)
for f in schema["fields"]
)
)
def _validate_union(datum, schema, named_schemas, parent_ns, raise_errors, options):
"""
Check that the data is a list type with possible options to
validate as True.
"""
if isinstance(datum, tuple) and not options.get("disable_tuple_notation"):
(name, datum) = datum
for candidate in schema:
if extract_record_type(candidate) == "record":
schema_name = candidate["name"]
else:
schema_name = candidate
if schema_name == name:
return _validate(
datum,
schema=candidate,
named_schemas=named_schemas,
field=parent_ns,
raise_errors=raise_errors,
options=options,
)
else:
return False
errors = []
for s in schema:
try:
ret = _validate(
datum,
schema=s,
named_schemas=named_schemas,
field=parent_ns,
raise_errors=raise_errors,
options=options,
)
if ret:
# We exit on the first passing type in Unions
return True
except ValidationError as e:
errors.extend(e.errors)
if raise_errors:
raise ValidationError(*errors)
return False
VALIDATORS = {
"null": _validate_null,
"boolean": _validate_boolean,
"string": _validate_string,
"int": _validate_int,
"long": _validate_long,
"float": _validate_float,
"double": _validate_float,
"bytes": _validate_bytes,
"fixed": _validate_fixed,
"enum": _validate_enum,
"array": _validate_array,
"map": _validate_map,
"union": _validate_union,
"error_union": _validate_union,
"record": _validate_record,
"error": _validate_record,
"request": _validate_record,
}
def _validate(datum, schema, named_schemas, field, raise_errors, options):
# This function expects the schema to already be parsed
record_type = extract_record_type(schema)
result = None
if datum is NoValue and options.get("strict"):
result = False
else:
if datum is NoValue:
datum = None
logical_type = extract_logical_type(schema)
if logical_type:
prepare = LOGICAL_WRITERS.get(logical_type)
if prepare:
datum = prepare(datum, schema)
validator = VALIDATORS.get(record_type)
if validator:
result = validator(
datum,
schema=schema,
named_schemas=named_schemas,
parent_ns=field,
raise_errors=raise_errors,
options=options,
)
elif record_type in named_schemas:
result = _validate(
datum,
schema=named_schemas[record_type],
named_schemas=named_schemas,
field=field,
raise_errors=raise_errors,
options=options,
)
else:
raise UnknownType(record_type)
if raise_errors and result is False:
raise ValidationError(ValidationErrorData(datum, schema, field))
return result
def validate(
datum: Any,
schema: Schema,
field: str = "",
raise_errors: bool = True,
strict: bool = False,
disable_tuple_notation: bool = False,
) -> bool:
"""
Determine if a python datum is an instance of a schema.
Parameters
----------
datum
Data being validated
schema
Schema
field
Record field being validated
raise_errors
If true, errors are raised for invalid data. If false, a simple
True (valid) or False (invalid) result is returned
strict
If true, fields without values will raise errors rather than implicitly
defaulting to None
disable_tuple_notation
If set to True, tuples will not be treated as a special case. Therefore,
using a tuple to indicate the type of a record will not work
Example::
from fastavro.validation import validate
schema = {...}
record = {...}
validate(record, schema)
"""
named_schemas: NamedSchemas = {}
parsed_schema = parse_schema(schema, named_schemas)
return _validate(
datum,
parsed_schema,
named_schemas,
field,
raise_errors,
options={"strict": strict, "disable_tuple_notation": disable_tuple_notation},
)
def validate_many(
records: Iterable[Any],
schema: Schema,
raise_errors: bool = True,
strict: bool = False,
disable_tuple_notation: bool = False,
) -> bool:
"""
Validate a list of data!
Parameters
----------
records
List of records to validate
schema
Schema
raise_errors
If true, errors are raised for invalid data. If false, a simple
True (valid) or False (invalid) result is returned
strict
If true, fields without values will raise errors rather than implicitly
defaulting to None
disable_tuple_notation
If set to True, tuples will not be treated as a special case. Therefore,
using a tuple to indicate the type of a record will not work
Example::
from fastavro.validation import validate_many
schema = {...}
records = [{...}, {...}, ...]
validate_many(records, schema)
"""
named_schemas: NamedSchemas = {}
parsed_schema = parse_schema(schema, named_schemas)
errors = []
results = []
for record in records:
try:
results.append(
_validate(
record,
parsed_schema,
named_schemas,
field="",
raise_errors=raise_errors,
options={
"strict": strict,
"disable_tuple_notation": disable_tuple_notation,
},
)
)
except ValidationError as e:
errors.extend(e.errors)
if raise_errors and errors:
raise ValidationError(*errors)
return all(results)