mirror of
https://github.com/qgis/QGIS.git
synced 2025-02-25 00:58:06 -05:00
101 lines
3.8 KiB
Python
101 lines
3.8 KiB
Python
#WCS response decoder.
|
|
#Decodes response from a WCS (either a Coverages XML document or a Multipart MIME) and extracts the urls of the coverage data.
|
|
#Copyright (c) 2007 STFC <http://www.stfc.ac.uk>
|
|
#Author: Dominic Lowe, STFC
|
|
#contact email: d.lowe@rl.ac.uk
|
|
#
|
|
# Multipart MIME decoding based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/86676
|
|
|
|
#example: used in conjunction with ows lib wcs:
|
|
|
|
#from owslib import wcsdecoder
|
|
#u=wcs.getcoverage(identifier=['TuMYrRQ4'], timeSequence=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41),format='application/netcdf', store='true')
|
|
#decoder=wcsdecoder.WCSDecoder(u)
|
|
#decoder.getCoverages()
|
|
|
|
from __future__ import (absolute_import, division, print_function)
|
|
|
|
import os
|
|
from owslib.etree import etree
|
|
import email
|
|
import errno
|
|
|
|
class WCSDecoder(object):
|
|
def __init__(self, u):
|
|
''' initiate with a urllib url object.'''
|
|
self.u=u
|
|
self._getType()
|
|
|
|
def _getType(self):
|
|
''' determine whether it is a Multipart Mime or a Coverages XML file'''
|
|
|
|
#what's the best way to test this?
|
|
#for now read start of file
|
|
tempu=self.u
|
|
if tempu.readline()[:14] == '<?xml version=':
|
|
self.urlType='XML'
|
|
else:
|
|
self.urlType='Multipart'
|
|
|
|
|
|
def getCoverages(self, unpackdir='./unpacked'):
|
|
if self.urlType=='XML':
|
|
paths=[]
|
|
u_xml = self.u.read()
|
|
u_tree = etree.fromstring(u_xml)
|
|
for ref in u_tree.findall('{http://www.opengis.net/wcs/1.1}Coverage/{http://www.opengis.net/wcs/1.1}Reference'):
|
|
path = ref.attrib['{http://www.w3.org/1999/xlink}href']
|
|
paths.append(path)
|
|
for ref in u_tree.findall('{http://www.opengis.net/wcs/1.1.0/owcs}Coverage/{{http://www.opengis.net/wcs/1.1.0/owcs}Reference'):
|
|
path = ref.attrib['{http://www.w3.org/1999/xlink}href']
|
|
paths.append(path)
|
|
elif self.urlType=='Multipart':
|
|
#Decode multipart mime and return fileobjects
|
|
u_mpart=self.u.read()
|
|
mpart =MpartMime(u_mpart)
|
|
paths= mpart.unpackToDir(unpackdir)
|
|
return paths
|
|
|
|
class MpartMime(object):
|
|
def __init__ (self,mpartmime):
|
|
""" mpartmime is a multipart mime file that has already been read in."""
|
|
self.mpartmime=mpartmime
|
|
|
|
def unpackToDir(self, unpackdir):
|
|
""" unpacks contents of Multipart mime to a given directory"""
|
|
|
|
names=[]
|
|
#create the directory if it doesn't exist:
|
|
try:
|
|
os.mkdir(unpackdir)
|
|
except OSError as e:
|
|
# Ignore directory exists error
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
#now walk through the multipart mime and write out files
|
|
msg = email.message_from_string(self.mpartmime)
|
|
counter =1
|
|
for part in msg.walk():
|
|
# multipart/* are just containers, ignore
|
|
if part.get_content_maintype() == 'multipart':
|
|
continue
|
|
# Applications should really check the given filename so that an
|
|
# email message can't be used to overwrite important files
|
|
filename = part.get_filename()
|
|
if not filename:
|
|
try:
|
|
ext = mimetypes.guess_extension(part.get_type())
|
|
except:
|
|
ext=None
|
|
if not ext:
|
|
# Use a generic extension
|
|
ext = '.bin'
|
|
filename = 'part-%03d%s' % (counter, ext)
|
|
fullpath=os.path.join(unpackdir, filename)
|
|
names.append(fullpath)
|
|
fp = open(fullpath, 'wb')
|
|
fp.write(part.get_payload(decode=True))
|
|
fp.close()
|
|
return names
|