mirror of
https://github.com/qgis/QGIS.git
synced 2025-02-25 00:58:06 -05:00
146 lines
4.5 KiB
Python
146 lines
4.5 KiB
Python
|
# =============================================================================
|
||
|
# OWSLib. Copyright (C) 2015 Jachym Cepicky
|
||
|
#
|
||
|
# Contact email: jachym.cepicky@gmail.com
|
||
|
#
|
||
|
# =============================================================================
|
||
|
"""
|
||
|
Set of functions, which are suitable for DescribeFeatureType parsing and
|
||
|
generating layer schema description compatible with `fiona`
|
||
|
"""
|
||
|
|
||
|
import cgi, sys
|
||
|
from owslib.util import openURL
|
||
|
try:
|
||
|
from urllib import urlencode
|
||
|
except ImportError:
|
||
|
from urllib.parse import urlencode
|
||
|
from owslib.etree import etree
|
||
|
from owslib.namespaces import Namespaces
|
||
|
from owslib.util import which_etree, findall
|
||
|
|
||
|
MYNS = Namespaces()
|
||
|
XS_NAMESPACE = MYNS.get_namespace('xs')
|
||
|
GML_NAMESPACES = (MYNS.get_namespace('gml'),
|
||
|
MYNS.get_namespace('gml311'),
|
||
|
MYNS.get_namespace('gml32'))
|
||
|
|
||
|
|
||
|
def get_schema(url, typename, version='1.0.0', timeout=30):
|
||
|
"""Parses DescribeFeatureType response and creates schema compatible
|
||
|
with :class:`fiona`
|
||
|
|
||
|
:param str url: url of the service
|
||
|
:param str version: version of the service
|
||
|
:param str typename: name of the layer
|
||
|
:param int timeout: request timeout
|
||
|
"""
|
||
|
|
||
|
url = _get_describefeaturetype_url(url, version, typename)
|
||
|
res = openURL(url, timeout=timeout)
|
||
|
root = etree.fromstring(res.read())
|
||
|
type_element = findall(root, '{%s}element' % XS_NAMESPACE,
|
||
|
attribute_name='name', attribute_value=typename)[0]
|
||
|
complex_type = type_element.attrib['type'].split(":")[1]
|
||
|
elements = _get_elements(complex_type, root)
|
||
|
nsmap = None
|
||
|
if hasattr(root, 'nsmap'):
|
||
|
nsmap = root.nsmap
|
||
|
return _construct_schema(elements, nsmap)
|
||
|
|
||
|
|
||
|
def _get_elements(complex_type, root):
|
||
|
"""Get attribute elements
|
||
|
"""
|
||
|
|
||
|
found_elements = []
|
||
|
element = findall(root, '{%s}complexType' % XS_NAMESPACE,
|
||
|
attribute_name='name', attribute_value=complex_type)[0]
|
||
|
found_elements = findall(element, '{%s}element' % XS_NAMESPACE)
|
||
|
|
||
|
return found_elements
|
||
|
|
||
|
def _construct_schema(elements, nsmap):
|
||
|
"""Consruct fiona schema based on given elements
|
||
|
|
||
|
:param list Element: list of elements
|
||
|
:param dict nsmap: namespace map
|
||
|
|
||
|
:return dict: schema
|
||
|
"""
|
||
|
|
||
|
schema = {
|
||
|
'properties': {},
|
||
|
'geometry': None
|
||
|
}
|
||
|
|
||
|
schema_key = None
|
||
|
gml_key = None
|
||
|
|
||
|
# if nsmap is defined, use it
|
||
|
if nsmap:
|
||
|
for key in nsmap:
|
||
|
if nsmap[key] == XS_NAMESPACE:
|
||
|
schema_key = key
|
||
|
if nsmap[key] in GML_NAMESPACES:
|
||
|
gml_key = key
|
||
|
# if no nsmap is defined, we have to guess
|
||
|
else:
|
||
|
gml_key = 'gml'
|
||
|
schema_key = 'xsd'
|
||
|
|
||
|
for element in elements:
|
||
|
data_type = element.attrib['type'].replace(gml_key + ':', '')
|
||
|
name = element.attrib['name']
|
||
|
|
||
|
if data_type == 'PointPropertyType':
|
||
|
schema['geometry'] = 'Point'
|
||
|
elif data_type == 'PolygonPropertyType':
|
||
|
schema['geometry'] = 'Polygon'
|
||
|
elif data_type == 'LineStringPropertyType':
|
||
|
schema['geometry'] = 'LineString'
|
||
|
elif data_type == 'MultiPointPropertyType':
|
||
|
schema['geometry'] = 'MultiPoint'
|
||
|
elif data_type == 'MultiLineStringPropertyType':
|
||
|
schema['geometry'] = 'MultiLineString'
|
||
|
elif data_type == 'MultiPolygonPropertyType':
|
||
|
schema['geometry'] = 'MultiPolygon'
|
||
|
elif data_type == 'MultiGeometryPropertyType':
|
||
|
schema['geometry'] = 'MultiGeometry'
|
||
|
elif data_type == 'GeometryPropertyType':
|
||
|
schema['geometry'] = 'GeometryCollection'
|
||
|
elif data_type == 'SurfacePropertyType':
|
||
|
schema['geometry'] = '3D Polygon'
|
||
|
else:
|
||
|
schema['properties'][name] = data_type.replace(schema_key+':', '')
|
||
|
|
||
|
if schema['properties'] or schema['geometry']:
|
||
|
return schema
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def _get_describefeaturetype_url(url, version, typename):
|
||
|
"""Get url for describefeaturetype request
|
||
|
|
||
|
:return str: url
|
||
|
"""
|
||
|
|
||
|
query_string = []
|
||
|
if url.find('?') != -1:
|
||
|
query_string = cgi.parse_qsl(url.split('?')[1])
|
||
|
|
||
|
params = [x[0] for x in query_string]
|
||
|
|
||
|
if 'service' not in params:
|
||
|
query_string.append(('service', 'WFS'))
|
||
|
if 'request' not in params:
|
||
|
query_string.append(('request', 'DescribeFeatureType'))
|
||
|
if 'version' not in params:
|
||
|
query_string.append(('version', version))
|
||
|
|
||
|
query_string.append(('typeName', typename))
|
||
|
|
||
|
urlqs = urlencode(tuple(query_string))
|
||
|
return url.split('?')[0] + '?' + urlqs
|
||
|
|