# -*- coding: ISO-8859-15 -*- # ============================================================================= # Copyright (c) 2004, 2006 Sean C. Gillies # Copyright (c) 2005 Nuxeo SARL # # Authors : Sean Gillies # Julien Anguenot # # Contact email: sgillies@frii.com # ============================================================================= """ API for Web Map Service (WMS) methods and metadata. Currently supports only version 1.1.1 of the WMS protocol. """ import cgi import urllib2 from urllib import urlencode import warnings from etree import etree from .util import openURL, testXMLValue, extract_xml_list, xmltag_split from fgdc import Metadata from iso import MD_Metadata class ServiceException(Exception): """WMS ServiceException Attributes: message -- short error message xml -- full xml error message from server """ def __init__(self, message, xml): self.message = message self.xml = xml def __str__(self): return repr(self.message) class CapabilitiesError(Exception): pass class WebMapService(object): """Abstraction for OGC Web Map Service (WMS). Implements IWebMapService. """ def __getitem__(self,name): ''' check contents dictionary to allow dict like access to service layers''' if name in self.__getattribute__('contents').keys(): return self.__getattribute__('contents')[name] else: raise KeyError, "No content named %s" % name def __init__(self, url, version='1.1.1', xml=None, username=None, password=None, parse_remote_metadata=False ): """Initialize.""" self.url = url self.username = username self.password = password self.version = version self._capabilities = None # Authentication handled by Reader reader = WMSCapabilitiesReader( self.version, url=self.url, un=self.username, pw=self.password ) if xml: # read from stored xml self._capabilities = reader.readString(xml) else: # read from server self._capabilities = reader.read(self.url) # avoid building capabilities metadata if the response is a ServiceExceptionReport se = self._capabilities.find('ServiceException') if se is not None: err_message = str(se.text).strip() raise ServiceException(err_message, xml) # build metadata objects self._buildMetadata(parse_remote_metadata) def _getcapproperty(self): if not self._capabilities: reader = WMSCapabilitiesReader( self.version, url=self.url, un=self.username, pw=self.password ) self._capabilities = ServiceMetadata(reader.read(self.url)) return self._capabilities def _buildMetadata(self, parse_remote_metadata=False): ''' set up capabilities metadata objects ''' #serviceIdentification metadata serviceelem=self._capabilities.find('Service') self.identification=ServiceIdentification(serviceelem, self.version) #serviceProvider metadata self.provider=ServiceProvider(serviceelem) #serviceOperations metadata self.operations=[] for elem in self._capabilities.find('Capability/Request')[:]: self.operations.append(OperationMetadata(elem)) #serviceContents metadata: our assumption is that services use a top-level #layer as a metadata organizer, nothing more. self.contents={} caps = self._capabilities.find('Capability') #recursively gather content metadata for all layer elements. #To the WebMapService.contents store only metadata of named layers. def gather_layers(parent_elem, parent_metadata): for index, elem in enumerate(parent_elem.findall('Layer')): cm = ContentMetadata(elem, parent=parent_metadata, index=index+1, parse_remote_metadata=parse_remote_metadata) if cm.id: if cm.id in self.contents: warnings.warn('Content metadata for layer "%s" already exists. Using child layer' % cm.id) self.contents[cm.id] = cm gather_layers(elem, cm) gather_layers(caps, None) #exceptions self.exceptions = [f.text for f \ in self._capabilities.findall('Capability/Exception/Format')] def items(self): '''supports dict-like items() access''' items=[] for item in self.contents: items.append((item,self.contents[item])) return items def getcapabilities(self): """Request and return capabilities document from the WMS as a file-like object. NOTE: this is effectively redundant now""" reader = WMSCapabilitiesReader( self.version, url=self.url, un=self.username, pw=self.password ) u = self._open(reader.capabilities_url(self.url)) # check for service exceptions, and return if u.info().gettype() == 'application/vnd.ogc.se_xml': se_xml = u.read() se_tree = etree.fromstring(se_xml) err_message = str(se_tree.find('ServiceException').text).strip() raise ServiceException(err_message, se_xml) return u def getmap(self, layers=None, styles=None, srs=None, bbox=None, format=None, size=None, time=None, transparent=False, bgcolor='#FFFFFF', exceptions='application/vnd.ogc.se_xml', method='Get', **kwargs ): """Request and return an image from the WMS as a file-like object. Parameters ---------- layers : list List of content layer names. styles : list Optional list of named styles, must be the same length as the layers list. srs : string A spatial reference system identifier. bbox : tuple (left, bottom, right, top) in srs units. format : string Output image format such as 'image/jpeg'. size : tuple (width, height) in pixels. transparent : bool Optional. Transparent background if True. bgcolor : string Optional. Image background color. method : string Optional. HTTP DCP method name: Get or Post. **kwargs : extra arguments anything else e.g. vendor specific parameters Example ------- >>> wms = WebMapService('http://giswebservices.massgis.state.ma.us/geoserver/wms', version='1.1.1') >>> img = wms.getmap(layers=['massgis:GISDATA.SHORELINES_ARC'],\ styles=[''],\ srs='EPSG:4326',\ bbox=(-70.8, 42, -70, 42.8),\ size=(300, 300),\ format='image/jpeg',\ transparent=True) >>> out = open('example.jpg.jpg', 'wb') >>> out.write(img.read()) >>> out.close() """ try: base_url = next((m.get('url') for m in self.getOperationByName('GetMap').methods if m.get('type').lower() == method.lower())) except StopIteration: base_url = self.url request = {'version': self.version, 'request': 'GetMap'} # check layers and styles assert len(layers) > 0 request['layers'] = ','.join(layers) if styles: assert len(styles) == len(layers) request['styles'] = ','.join(styles) else: request['styles'] = '' # size request['width'] = str(size[0]) request['height'] = str(size[1]) request['srs'] = str(srs) request['bbox'] = ','.join([repr(x) for x in bbox]) request['format'] = str(format) request['transparent'] = str(transparent).upper() request['bgcolor'] = '0x' + bgcolor[1:7] request['exceptions'] = str(exceptions) if time is not None: request['time'] = str(time) if kwargs: for kw in kwargs: request[kw]=kwargs[kw] data = urlencode(request) u = openURL(base_url, data, method, username = self.username, password = self.password) # check for service exceptions, and return if u.info()['Content-Type'] == 'application/vnd.ogc.se_xml': se_xml = u.read() se_tree = etree.fromstring(se_xml) err_message = unicode(se_tree.find('ServiceException').text).strip() raise ServiceException(err_message, se_xml) return u def getServiceXML(self): xml = None if self._capabilities is not None: xml = etree.tostring(self._capabilities) return xml def getfeatureinfo(self): raise NotImplementedError def getOperationByName(self, name): """Return a named content item.""" for item in self.operations: if item.name == name: return item raise KeyError, "No operation named %s" % name class ServiceIdentification(object): ''' Implements IServiceIdentificationMetadata ''' def __init__(self, infoset, version): self._root=infoset self.type = testXMLValue(self._root.find('Name')) self.version = version self.title = testXMLValue(self._root.find('Title')) self.abstract = testXMLValue(self._root.find('Abstract')) self.keywords = extract_xml_list(self._root.findall('KeywordList/Keyword')) self.accessconstraints = testXMLValue(self._root.find('AccessConstraints')) self.fees = testXMLValue(self._root.find('Fees')) class ServiceProvider(object): ''' Implements IServiceProviderMetatdata ''' def __init__(self, infoset): self._root=infoset name=self._root.find('ContactInformation/ContactPersonPrimary/ContactOrganization') if name is not None: self.name=name.text else: self.name=None self.url=self._root.find('OnlineResource').attrib.get('{http://www.w3.org/1999/xlink}href', '') #contact metadata contact = self._root.find('ContactInformation') ## sometimes there is a contact block that is empty, so make ## sure there are children to parse if contact is not None and contact[:] != []: self.contact = ContactMetadata(contact) else: self.contact = None def getContentByName(self, name): """Return a named content item.""" for item in self.contents: if item.name == name: return item raise KeyError, "No content named %s" % name def getOperationByName(self, name): """Return a named content item.""" for item in self.operations: if item.name == name: return item raise KeyError, "No operation named %s" % name class ContentMetadata: """ Abstraction for WMS layer metadata. Implements IContentMetadata. """ def __init__(self, elem, parent=None, index=0, parse_remote_metadata=False, timeout=30): if elem.tag != 'Layer': raise ValueError('%s should be a Layer' % (elem,)) self.parent = parent if parent: self.index = "%s.%d" % (parent.index, index) else: self.index = str(index) self.id = self.name = testXMLValue(elem.find('Name')) # layer attributes self.queryable = int(elem.attrib.get('queryable', 0)) self.cascaded = int(elem.attrib.get('cascaded', 0)) self.opaque = int(elem.attrib.get('opaque', 0)) self.noSubsets = int(elem.attrib.get('noSubsets', 0)) self.fixedWidth = int(elem.attrib.get('fixedWidth', 0)) self.fixedHeight = int(elem.attrib.get('fixedHeight', 0)) # title is mandatory property self.title = None title = testXMLValue(elem.find('Title')) if title is not None: self.title = title.strip() self.abstract = testXMLValue(elem.find('Abstract')) # bboxes b = elem.find('BoundingBox') self.boundingBox = None if b is not None: try: #sometimes the SRS attribute is (wrongly) not provided srs=b.attrib['SRS'] except KeyError: srs=None self.boundingBox = ( float(b.attrib['minx']), float(b.attrib['miny']), float(b.attrib['maxx']), float(b.attrib['maxy']), srs, ) elif self.parent: if hasattr(self.parent, 'boundingBox'): self.boundingBox = self.parent.boundingBox # ScaleHint sh = elem.find('ScaleHint') self.scaleHint = None if sh is not None: self.scaleHint = {'min': sh.attrib['min'], 'max': sh.attrib['max']} attribution = elem.find('Attribution') if attribution is not None: self.attribution = dict() title = attribution.find('Title') url = attribution.find('OnlineResource') logo = attribution.find('LogoURL') if title is not None: self.attribution['title'] = title.text if url is not None: self.attribution['url'] = url.attrib['{http://www.w3.org/1999/xlink}href'] if logo is not None: self.attribution['logo_size'] = (int(logo.attrib['width']), int(logo.attrib['height'])) self.attribution['logo_url'] = logo.find('OnlineResource').attrib['{http://www.w3.org/1999/xlink}href'] b = elem.find('LatLonBoundingBox') if b is not None: self.boundingBoxWGS84 = ( float(b.attrib['minx']), float(b.attrib['miny']), float(b.attrib['maxx']), float(b.attrib['maxy']), ) elif self.parent: self.boundingBoxWGS84 = self.parent.boundingBoxWGS84 else: self.boundingBoxWGS84 = None #SRS options self.crsOptions = [] #Copy any parent SRS options (they are inheritable properties) if self.parent: self.crsOptions = list(self.parent.crsOptions) #Look for SRS option attached to this layer if elem.find('SRS') is not None: ## some servers found in the wild use a single SRS ## tag containing a whitespace separated list of SRIDs ## instead of several SRS tags. hence the inner loop for srslist in map(lambda x: x.text, elem.findall('SRS')): if srslist: for srs in srslist.split(): self.crsOptions.append(srs) #Get rid of duplicate entries self.crsOptions = list(set(self.crsOptions)) #Set self.crsOptions to None if the layer (and parents) had no SRS options if len(self.crsOptions) == 0: #raise ValueError('%s no SRS available!?' % (elem,)) #Comment by D Lowe. #Do not raise ValueError as it is possible that a layer is purely a parent layer and does not have SRS specified. Instead set crsOptions to None # Comment by Jachym: # Do not set it to None, but to [], which will make the code # work further. Fixed by anthonybaxter self.crsOptions=[] #Styles self.styles = {} #Copy any parent styles (they are inheritable properties) if self.parent: self.styles = self.parent.styles.copy() #Get the styles for this layer (items with the same name are replaced) for s in elem.findall('Style'): name = s.find('Name') title = s.find('Title') if name is None or title is None: raise ValueError('%s missing name or title' % (s,)) style = { 'title' : title.text } # legend url legend = s.find('LegendURL/OnlineResource') if legend is not None: style['legend'] = legend.attrib['{http://www.w3.org/1999/xlink}href'] self.styles[name.text] = style # keywords self.keywords = [f.text for f in elem.findall('KeywordList/Keyword')] # timepositions - times for which data is available. self.timepositions=None self.defaulttimeposition = None for extent in elem.findall('Extent'): if extent.attrib.get("name").lower() =='time': if extent.text: self.timepositions=extent.text.split(',') self.defaulttimeposition = extent.attrib.get("default") break # Elevations - available vertical levels self.elevations=None for extent in elem.findall('Extent'): if extent.attrib.get("name").lower() =='elevation': if extent.text: self.elevations=extent.text.split(',') break # MetadataURLs self.metadataUrls = [] for m in elem.findall('MetadataURL'): metadataUrl = { 'type': testXMLValue(m.attrib['type'], attrib=True), 'format': testXMLValue(m.find('Format')), 'url': testXMLValue(m.find('OnlineResource').attrib['{http://www.w3.org/1999/xlink}href'], attrib=True) } if metadataUrl['url'] is not None and parse_remote_metadata: # download URL try: content = urllib2.urlopen(metadataUrl['url'], timeout=timeout) doc = etree.parse(content) if metadataUrl['type'] is not None: if metadataUrl['type'] == 'FGDC': metadataUrl['metadata'] = Metadata(doc) if metadataUrl['type'] == 'TC211': metadataUrl['metadata'] = MD_Metadata(doc) except Exception, err: metadataUrl['metadata'] = None self.metadataUrls.append(metadataUrl) # DataURLs self.dataUrls = [] for m in elem.findall('DataURL'): dataUrl = { 'format': m.find('Format').text.strip(), 'url': m.find('OnlineResource').attrib['{http://www.w3.org/1999/xlink}href'] } self.dataUrls.append(dataUrl) self.layers = [] for child in elem.findall('Layer'): self.layers.append(ContentMetadata(child, self)) def __str__(self): return 'Layer Name: %s Title: %s' % (self.name, self.title) class OperationMetadata: """Abstraction for WMS OperationMetadata. Implements IOperationMetadata. """ def __init__(self, elem): """.""" self.name = xmltag_split(elem.tag) # formatOptions self.formatOptions = [f.text for f in elem.findall('Format')] self.methods = [] for verb in elem.findall('DCPType/HTTP/*'): url = verb.find('OnlineResource').attrib['{http://www.w3.org/1999/xlink}href'] self.methods.append({'type' : xmltag_split(verb.tag), 'url': url}) class ContactMetadata: """Abstraction for contact details advertised in GetCapabilities. """ def __init__(self, elem): name = elem.find('ContactPersonPrimary/ContactPerson') if name is not None: self.name=name.text else: self.name=None email = elem.find('ContactElectronicMailAddress') if email is not None: self.email=email.text else: self.email=None self.address = self.city = self.region = None self.postcode = self.country = None address = elem.find('ContactAddress') if address is not None: street = address.find('Address') if street is not None: self.address = street.text city = address.find('City') if city is not None: self.city = city.text region = address.find('StateOrProvince') if region is not None: self.region = region.text postcode = address.find('PostCode') if postcode is not None: self.postcode = postcode.text country = address.find('Country') if country is not None: self.country = country.text organization = elem.find('ContactPersonPrimary/ContactOrganization') if organization is not None: self.organization = organization.text else:self.organization = None position = elem.find('ContactPosition') if position is not None: self.position = position.text else: self.position = None class WMSCapabilitiesReader: """Read and parse capabilities document into a lxml.etree infoset """ def __init__(self, version='1.1.1', url=None, un=None, pw=None): """Initialize""" self.version = version self._infoset = None self.url = url self.username = un self.password = pw #if self.username and self.password: ## Provide login information in order to use the WMS server ## Create an OpenerDirector with support for Basic HTTP ## Authentication... #passman = HTTPPasswordMgrWithDefaultRealm() #passman.add_password(None, self.url, self.username, self.password) #auth_handler = HTTPBasicAuthHandler(passman) #opener = build_opener(auth_handler) #self._open = opener.open def capabilities_url(self, service_url): """Return a capabilities url """ qs = [] if service_url.find('?') != -1: qs = cgi.parse_qsl(service_url.split('?')[1]) params = [x[0] for x in qs] if 'service' not in params: qs.append(('service', 'WMS')) if 'request' not in params: qs.append(('request', 'GetCapabilities')) if 'version' not in params: qs.append(('version', self.version)) urlqs = urlencode(tuple(qs)) return service_url.split('?')[0] + '?' + urlqs def read(self, service_url): """Get and parse a WMS capabilities document, returning an elementtree instance service_url is the base url, to which is appended the service, version, and request parameters """ getcaprequest = self.capabilities_url(service_url) #now split it up again to use the generic openURL function... spliturl=getcaprequest.split('?') u = openURL(spliturl[0], spliturl[1], method='Get', username = self.username, password = self.password) return etree.fromstring(u.read()) def readString(self, st): """Parse a WMS capabilities document, returning an elementtree instance string should be an XML capabilities document """ if not isinstance(st, str): raise ValueError("String must be of type string, not %s" % type(st)) return etree.fromstring(st)