#! /usr/local/bin/python ''' name: form.py purpose: Form-handling features for CGI scripts version: 0.6 author: Andrew Clover licence: GPL ''' _version= 0.6 # ------- # Imports # ------- import os, sys, types, copy, string, re, whrandom, cStringIO # --------- # Constants # --------- # fdefs enumeration STRING, TEXT, ENUM, BOOL, LIST, MAP, FILE, INT, FLOAT= range(1,10) # DoS safeguards (off by default) limit_memory= 0 limit_list= 0 limit_file= 0 # i/o constants _CONTROLCHARS= '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F\x7F' _CONTROLCHARS_ALLOWNEWLINE= '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F\x7F' _SAFE= '_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' _UNSAFE= re.compile('[^a-zA-Z0-9_\xC0-\xFF\-]') _UNSAFEISH= re.compile('[^/.a-zA-Z0-9_\xC0-\xFF\-]') _UNSAFEISH_ISHBITS= re.compile('[./]{2}') _NULLTRANSLATION= string.maketrans('', '') _CRLF= '\x0D\x0A' _decU_re= re.compile('%'+'[0123456789abcdefABCDEF]'*2) _encU_re= re.compile('[%<>#"{}\[\]|\\^`;/?:@&=+,$\x01-\x20\x7F-\xFF]') _encH_re= re.compile('[&<>"\x7F-\xFF\x01-\x1F]') _encH_special= {'&': '&', '<': '<', '>': '>', '"': '"', '\xA0': ' '} _CHUNK= 1024*8 _MIME_CHUNK= 1024*2 # Boolean type class _False: def __nonzero__(self): return 0 def __int__(self): return 0 def __str__(self): return 'false' class _True: def __nonzero__(self): return 1 def __int__(self): return 1 def __str__(self): return 'true' false= _False() true= _True() def bool(x): if x: return true else: return false # ---------- # Exceptions # ---------- cgiError= 'form.cgiError' httpError= 'form.httpError' fdefError= 'form.fdefError' # -------------- # Initialisation # -------------- # initialise - you don't need to call this but it provides a quicker way to set some variables def initialise(version, memorylimit= 0, filelimit= 0, listlimit= 0, europe= false, *voida, **voidk): global limit_memory, limit_file, limit_list, sepChars, decChars limit_memory= memorylimit limit_file= filelimit limit_list= listlimit if europe: sepChars= ' .' decChars=',' else: sepChars= ' ,' decChars='.' if version>_version: raise NotImplementedError, 'form.py version '+str(version)+' required, only version '+str(_version)+' available' # ----------------------- # Input-reading functions # ----------------------- # All readX functions work by parsing the various input formats to get key/value pairs and then passing each such field to a # _FieldStore object. The _FieldStore object takes care of ensuring that the field conforms to the specifications in the fdefs that # it was initialised with, including what to do with multiple fields, file uploads and image-submit buttons. When all the fields have # been parsed, the _FieldStore's contents are returned as a dictionary. # readForm -- # This is a shell function that does not parse anything itself. It checks the environment to find the request method and encoding # type, and forwards to the relevant readX function def readForm(fdefs): if not os.environ.has_key('REQUEST_METHOD'): raise cgiError, 'Server environment variable REQUEST_METHOD not set' method= os.environ['REQUEST_METHOD'] # get requests: send the query string straight to UrlEncoded parser if method=='GET': return readUrlEncoded(fdefs, os.environ.get('QUERY_STRING', '')) # post requests: first work out what the form encoding type is elif method=='POST': if not os.environ.has_key('CONTENT_TYPE'): raise cgiError, 'Server environment variable CONTENT_TYPE not set' if not os.environ.has_key('CONTENT_LENGTH'): raise cgiError, 'Server environment variable CONTENT_LENGTH not set' try: length= int(os.environ['CONTENT_LENGTH']) except ValueError: raise cgiError, 'Server environment variable CONTENT_LENGTH not a valid integer' (contentType, contentPars)= _parseMimeHeader(os.environ['CONTENT_TYPE']) # url-encoded post: send the standard input stream through the UrlEncoded stream parser if contentType=='application/x-www-form-urlencoded': return readUrlEncodedStream(fdefs, sys.stdin, length) # form-data: get the boundary identifier from headers and send standard input through MIME parser elif contentType=='multipart/form-data': return readFormDataStream(fdefs, sys.stdin, length, contentPars) # unknown request method or type else: raise httpError, 'Content-type "'+contentType+'" not supported' else: raise httpError, 'Method "'+method+'" not supported' # readUrlEncoded -- # Take a url-encoded submission. Split into query fields. Separate keys from values. Send them to _FieldStore object. def readUrlEncoded(fdefs, query): fvals= _FieldStore(fdefs) if query!='': fields= re.split('[&;]', query) for field in fields: fieldPair= string.split(field, '=', 1) if len(fieldPair)!=2: raise httpError, 'Malformed query string - field "'+query+'" contained no "="' [fieldName, value]= map(decU, fieldPair) fvals.write(fieldName, string.replace(value, _CRLF, '\n')) return fvals.read() # readUrlEncodedStream -- # Read query string fields bit by bit from an input stream, making sure not to exceed DoS memory limitation def readUrlEncodedStream(fdefs, stream, length): # if we can fit the entire query string in the space allowed us by limit_memory, we can use the faster, non-streaming version if lengthminMax[1]: raise fdefError, 'Wrong number of parameters for fdef '+fieldName # copy the fdef to ourselves and initialise the field. self.fdefs[fieldName]= (ftype, fpars) if ftype==ENUM and len(fpars)>=2: self.fields[fieldName]= fpars[1] else: self.fields[fieldName]= copy.copy(self.default[ftype]) # fileDir checks to see if a field can accept a file upload and if so returns the name of the directory the file should end up in. def fileDir(self, fieldName): if self.fdefs.has_key(fieldName): if self.fdefs[fieldName][0]==FILE: return self.fdefs[fieldName][1][0] return None # write is called when a field is received. But the value given may be totally wrong for the type of the field. So it must be checked # and made to fit or thrown away. def write(self, fieldName, value): # If the field name is unknown, first check to see if it's got an imagemap suffix on it. If so, remember what the suffix was (ie. which # co-ordinate is being changed), and remove it to get the name. if not self.fdefs.has_key(fieldName): if fieldName[-2:] in ['.x', '.y']: postfix= fieldName[-1:] fieldName= fieldName[:-2] # If the field name is still unknown, try to split it into a name and value encoded into the name only goodvalue= false if not self.fdefs.has_key(fieldName): if ':' in fieldName: [fieldName, value]= string.split(fieldName, ':', 1) goodvalue= true # If the field name is still unknown, forget it if not self.fdefs.has_key(fieldName): return # Ignore blank values passed in except in the case of ENUM, where a blank non-default choice is useful, and in the case where # the value was obtained from a name split ftype= self.fdefs[fieldName][0] fpars= self.fdefs[fieldName][1] if value=='' and ftype!=ENUM and not goodvalue: return # if the field is a file upload field, check a file is actually expected; if not, chuck it if type(value)==types.TupleType: if ftype==FILE: self.fields[fieldName].append(value) return # make STRING type conform if ftype==STRING: fLength= 0 fExclude= _CONTROLCHARS if len(fpars)>0: fLength= fpars[0] if len(fpars)>1: fExclude= fExclude+fpars[1] value= string.replace(value, '\n', ' ') value= string.translate(value, _NULLTRANSLATION, fExclude) if fLength>0: value= value[:fLength] self.fields[fieldName]= value # make TEXT type conform elif ftype==TEXT: fLength= 0 if len(fpars)>0: fLength= fpars[0] value= _wrapRe.sub(_wrapTextarea, value) value= string.translate(value, _NULLTRANSLATION, _CONTROLCHARS_ALLOWNEWLINE) if fLength>0: value= value[:fLength] self.fields[fieldName]= value # make ENUM type conform elif ftype==ENUM: fValues= fpars[0] if value in fValues: self.fields[fieldName]= value # make BOOL type conform elif ftype==BOOL: self.fields[fieldName]= bool(value=='on') # make LIST type conform elif ftype==LIST: value= string.replace(value, _CRLF, ' ') value= string.translate(value, _NULLTRANSLATION, _CONTROLCHARS) self.fields[fieldName].append(value) # make MAP type conform elif ftype==MAP: (fClipX, fClipY)= (None, None) if len(fpars)>0: (fClipX, fClipY)= fpars[0] previous= self.fields[fieldName] if previous==(-1, -1): previous= (0, 0) try: value= int(value) except ValueError: value= 0 if postfix=='x': previous= (_inRange(value, 0, fClipX), previous[1]) if postfix=='y': previous= (previous[0], _inRange(value, 0, fClipY)) self.fields[fieldName]= previous # make INT type conform elif ftype==INT: try: value= _readInt(value) except ValueError: value= 0 self.fields[fieldName]= value # make FLOAT type conform elif ftype==FLOAT: try: value= _readFloat(value) except ValueError: value= 0.0 self.fields[fieldName]= value # read just returns the accumulated field values def read(self): return self.fields # _inRange -- # Trivial min/max function for clipping MAP positions def _inRange(value, min, max): if min!=None: if valuemax: return max return value # ------------------------ # Request-output functions # ------------------------ # all the writeX functions rely on _subFields(). This function splits the fields key as seen in the dictionary into single sub-fields # (there will be two sub-fields for a MAP and up to limit_list for a LIST or FILE). The subfields are returned to the supplied # handler, which converts to the desired format and sends them down a stream. The writeX calls that return a string turn the # stream into a string using StringIO. def writeForm(fvals): stream= cStringIO.StringIO() _subFields(stream, fvals, _writeFormPart) value= stream.getvalue() stream.close() return value def writeFormStream(fvals, stream): _subFields(stream, fvals, _writeFormPart) def writeUrlEncoded(fvals): stream= cStringIO.StringIO() _subFields(stream, fvals, _writeUrlEncodedPart) value= stream.getvalue() stream.close() return value def writeUrlEncodedStream(fvals, stream): _subFields(stream, fvals, _writeUrlEncodedPart) # the writeFormData calls are a bit more involved because we need to work out a boundary before we can create the bits # themselves. In the case of string-output, we can guess a boundary and make the string, and if the data happened to contain our # boundary we have to throw it away and try again with a new boundary. In the case of stream output, we cannot keep the output # anywhere so at least a two-pass process is required, once to find a boundary and once to output. Both methods beed to have the # length of the final form-data prepended to the output in a Content-Length header, once we know what it is. def writeFormData(fvals): finder= _BoundaryFinder() while true: finder.newBoundary() finder.outputStream= cStringIO.StringIO() _subFields(finder, fvals, _writeFormDataPart) finder.outputStream.write('--'+finder.boundary+'--') if finder.valid: break finder.outputStream.close() value= finder.outputStream.getvalue() finder.outputStream.close() return 'Content-Type: multipart/form-data; boundary="'+finder.boundary+'"'+_CRLF+'Content-Length: '+str(len(value))+_CRLF*2+value def writeFormDataStream(dict, stream): finder= _BoundaryFinder() counter= _lengthCounter() finder.outputStream= counter while not finder.valid: finder.newBoundary() _subFields(finder, fvals, _writeFormDataPart) finder.outputStream.write('--'+finder.boundary+'--') finder.outputStream= stream stream.write('Content-Type: multipart/form-data; boundary="'+finder.boundary+'"'+_CRLF) stream.write('Content-Length: '+str(counter.length)+_CRLF*2) _subFields(finder, fvals, _writeFormDataPart) # _lengthCounter -- # It looks like a stream, but it only counts the number of bytes it has been sent class _lengthCounter: def __init__(self): self.length= 0 def write(self, data): self.length= self.length+len(data) # _boundaryFinder -- # Can be written to like a stream, but sets a flag if the boundary string is seen at the start of a line. class _BoundaryFinder: def __init__(self): self.outputStream= None self.valid= false def newBoundary(self): self.boundary= randomSafeString(32) self.valid= true self.lineQueue= '' self.startOfLine= true def write(self, data): # we have been passed some data. Only bother to check it if we still need to. if self.valid: self.lineQueue= self.lineQueue+data while true: if self.startOfLine: if self.lineQueue[:len(self.boundary)+2]=='--'+self.boundary: self.valid= false break splitPoint= string.find(self.lineQueue, _CRLF) # if no CRLF in queue, throw away queue (except for last character which might be the first character of a CRLF sequence). # Otherwise, lose the line at the start of the queue and look at the next line in. if splitPoint==-1: self.lineQueue= self.lineQueue[-1:] break else: self.lineQueue= self.lineQueue[splitPoint+2:] # okay, we may also need to pass the data onto a destination stream too, even if the boundary is now invalid. if self.outputStream!=None: self.outputStream.write(data) # _subFields -- # Go through the items in a fvals dictionary, split into sub-fields where necessary and send the sub-fields to a nominated function # which will output to the stream. Some functions may need to know if this is the first sub-part in the entire query or not so this # is also provided. def _subFields(stream, fvals, f): firstPart= true for fieldName in fvals.keys(): value= fvals[fieldName] if type(value)==types.StringType: f(stream, fieldName, string.replace(value, '\n', _CRLF*2), firstPart) elif type(value)==types.ListType: for listItem in value: f(stream, fieldName, listItem, firstPart) elif type(value)==types.InstanceType: if value: f(stream, fieldName, 'on', firstPart) elif type(value)==types.IntType: f(stream, fieldName, str(value), firstPart) elif type(value)==types.FloatType: f(stream, fieldName, str(value), firstPart) elif type(value)==types.TupleType and len(value)==2: f(stream, fieldName+'.x', str(value[0]), firstPart) f(stream, fieldName+'.y', str(value[1]), false) elif type(value)==types.TupleType and len(value)==4: f(stream, fieldName, value, firstPart) else: raise fdefError, 'Unknown type of field in fvals dictionary' firstPart= false # _writeFormPart -- # encode field into control def _writeFormPart(stream, fieldName, value, firstPart): if type(value)==types.TupleType: raise fdefError, 'Hidden file-upload fields cannot be included in forms' stream.write('\n') # _writeUrlEncodedPart -- # encode field into key=value pair def _writeUrlEncodedPart(stream, fieldName, value, firstPart): if type(value)==types.TupleType: raise fdefError, 'File-upload fields cannot be included in URL-encoded query string' if not firstPart: stream.write('&') stream.write(encU(fieldName)) stream.write('=') stream.write(encU(value)) # _writeFormDataPart -- # encode normal or file-upload field into form-data def _writeFormDataPart(stream, fieldName, value, firstPart): stream.outputStream.write('--'+stream.boundary+_CRLF) stream.write('Content-Disposition: form-data; name="'+fieldName+'"') if type(value)==types.TupleType: stream.write('; filename="'+value[1]+'"'+_CRLF) stream.write('Content-Type: '+value[3]+_CRLF) stream.write(_CRLF) uploadFile= open(value[0], 'rb') while true: chunk= uploadFile.read(_CHUNK) if chunk=='': break stream.write(chunk) uploadFile.close() else: stream.write(_CRLF*2) stream.write(value) stream.write(_CRLF) # ---------------------- # MIME-parsing functions # ---------------------- # Sadly we can't just palm this task off to the standard mimetools library, because we need to be able to stop parsing after a # certain number of bytes (that is, Content-Length). Attempting to read further may cause the process to halt, waiting for # input that will never come. # Also mimetools doesn't parse quoted parameters, as well. # _parseMimeHeader -- # Turn a MIME "Value; parameter=value"-style header value into main-value and a dictionary of parameters (keys lower-case) def _parseMimeHeader(header): headerParts= map(string.strip, string.split(header, ';')) headerMain= string.lower(headerParts[0]) headerPars= {} for parameter in headerParts[1:]: parameterParts= map(string.strip, string.split(parameter, '=', 1)) key= string.lower(parameterParts[0]) value= '' if len(parameterParts)>1: value= parameterParts[1] if len(value)>=2 and value[0]=='"' and value[-1]=='"': value= value[1:-1] headerPars[key]= value return (headerMain, headerPars) # _parseMimeHeaders -- # Parse a string containing a MIME (RFC822) header block into a dictionary of lower-case header lines and their values def _parseMimeHeaders(headerBlock): headers= string.split(headerBlock, '\n') currentHeader= '' dict= {} for header in headers: if header[0:0] in [' ', '\t']: if currentHeader=='': raise httpError, 'Malformed headers in multipart POST request body part' else: dict[currentHeader]= dict[currentHeader]+' '+string.strip(header) else: headerParts= string.split(header, ':', 1) if len(headerParts)!=2: raise httpError, 'Malformed headers in multipart POST request body part' currentHeader= string.lower(headerParts[0]) dict[currentHeader]= string.strip(headerParts[1]) return dict # _parseMimeMultipart -- # Use the _mimeStream class to read in multipart data and call a supplied function back with the body parts. def _parseMimeMultipart(stream, dispositionPars, f, fArgs): if not dispositionPars.has_key('boundary'): raise httpError, 'Multipart MIME input has no separating boundary' stream.pushBoundary(_CRLF+'--'+dispositionPars['boundary']+'--'+_CRLF) stream.pushBoundary(_CRLF+'--'+dispositionPars['boundary']+_CRLF) stream.popBoundary() while not stream.atBoundary(): stream.pushBoundary(_CRLF+'--'+dispositionPars['boundary']+_CRLF) # start of subpart: push a blank line boundary so we can read the headers only stream.pushBoundary(_CRLF+_CRLF) if limit_memory==0: headers= stream.read() else: headers= stream.read(limit_memory) stream.popBoundary() # part body: forward the stream to the client function. f(stream, _parseMimeHeaders(headers), fArgs) stream.popBoundary() # end of multipart stream.popBoundary() def _readable(x): # debug x= string.replace(x, _CRLF, '\\') if len(x)>20: return '"'+x[:8]+'...'+x[-8:]+'"' else: return '"'+x+'"' # _MimeStream -- # This class provides the bare bones of a stream interface. It sits around an input stream and: # - for non-binary parts, converts CRLF newlines to a simple '\n' # - handles boundaries, in a similar way to Python's multifile, except it won't read more than content-length # It may also decode known Content-Transfer-Encodings in the future, who knows, eh. # boundaries must not be larger than _MIME_CHUNK otherwise deadlocks can occur class _MimeStream: def __init__(self, stream, length): self.stream= stream self.length= length self.boundaries= [] # internally, _MimeStream maintains two queues. One full of data ready to be output, and one full of data not yet looked at, which # may contain boundaries. Input is chomped from the input stream into the input queue when this queue is too short to check for # current boundaries, and squirted, boundaryless, to the output queue, when more output is required. No CRLF conversion is done # until the output queue is finally read(). self.inputQueue= _CRLF self.outputQueue= '' self.atEnd= false # pushBoundary: add a boundary to the stack of lines that will stop output def pushBoundary(self, boundary): self.boundaries.append(boundary) # popBoundary: jump to end of current boundary and lose that boundary def popBoundary(self): while true: which= self.whichBoundary() if which!=None: if which!=-1: if which==len(self.boundaries)-1: self.inputQueue= self.inputQueue[len(self.boundaries[-1]):] self.boundaries[-1:]= [] break self.squirt() self.outputQueue= '' self.chomp() # atBoundary: are we at a boundary? def atBoundary(self): return self.whichBoundary()!=None # whichBoundary: which boundary are we at, or -1 for real EOF, or None at all? def whichBoundary(self): if self.outputQueue!='': return None self.chomp() if self.inputQueue=='': return -1 for i in range(len(self.boundaries)): if self.inputQueue[:len(self.boundaries[i])]==self.boundaries[i]: return i return None # chomp: fill the input queue with data from the input stream, also add a CRLF at the end of the file to cheat def chomp(self): appendLength= min(_MIME_CHUNK-len(self.inputQueue), self.length) if appendLength==0: queueAppend= '' else: queueAppend= self.stream.read(appendLength) self.length= self.length-len(queueAppend) self.inputQueue= self.inputQueue+queueAppend if self.length==0 and not self.atEnd: self.inputQueue= self.inputQueue+_CRLF self.atEnd= true # squirt: move as much data from the input queue to the output queue as possible def squirt(self): if self.length==0: nearestBoundaryIndex= len(self.inputQueue) else: nearestBoundaryIndex= len(self.inputQueue)-(max(map(len, self.boundaries)+[1])-1) for boundary in self.boundaries: boundaryIndex= string.find(self.inputQueue, boundary) if boundaryIndex!=-1 and boundaryIndex=appendLength: result= result+self.outputQueue[:appendLength] self.outputQueue= self.outputQueue[appendLength:] else: result= result+self.outputQueue[:newLineIndex+2] self.outputQueue= self.outputQueue[newLineIndex+2:] break return result # ----------------- # Utility functions # ----------------- # checked, selected, _on -- # Return blank string or 'checked'/'selected'/'on'. Shorthand for writing HTML, in the absence of a ?/: operator in Python. def checked(condition): if condition: return ' checked' else: return '' def selected(condition): if condition: return ' selected' else: return '' def _on(condition): if condition: return 'on' else: return '' # encH -- # Escape HTML-special characters. Uses &#xx; notation for characters except for &, <, >, " and hard-space. def _encH_char(charMatch): x= charMatch.group(0) if _encH_special.has_key(x): return _encH_special[x] else: return '&#'+str(ord(x))+';' def encH(text): return _encH_re.sub(_encH_char, text) def encHU(text): return encH(encU(text)) # encU, decU -- # Escape URL-special characters and decode them again def _encHex(x): if x<16: return '0'+'0123456789ABCDEF'[ord(x)] else: return '0123456789ABCDEF'[int(ord(x)/16)]+'0123456789ABCDEF'[int(ord(x)%16)] def _decHex(x): x= string.lower(x) try: return chr(string.index('0123456789abcdef', x[0])*16+string.index('0123456789abcdef', x[1])) except ValueError: return '' def _encU_char(charMatch): x= charMatch.group(0) return '%'+_encHex(x) def encU(text): return _encU_re.sub(_encU_char, text) def _decU_char(charMatch): x= charMatch.group(0) return _decHex(x[1:]) def decU(url): return string.replace(_decU_re.sub(_decU_char, url), '+', ' ') # randomSafeString -- # Come up with an identifier of a specified length made up only of letters, numbers and underscore def randomSafeString(length): safe= '' for i in range(length): safe= safe+whrandom.choice(_SAFE) return safe # makeSafe -- # Remove potentially dangerous characters from a string and make sure it is not null-string def makeSafe(x): x= _UNSAFE.sub('', x) if x=='': return '_' else: return x # makeSafeish -- # As makeSafe but allow single / and . def makeSafeish(x): x= _UNSAFEISH.sub('', x) x= _UNSAFEISH_ISHBITS.sub('_', x) if x=='': return '_' else: return x # ----------------------- # Number-reading functions # ----------------------- # number separator/decimal point characters, user-settable for different territories sepChars= " ," decChars= "." _digits= "0123456789" def _readInt(x): try: i= int(_readFloat(x)) except OverflowError: i= sys.maxint return i def _readFloat(x): sum= 0.0 dPlace= 0 for c in x: if c in _digits: if dPlace==0: sum= sum*10.0 sum= sum+string.index(_digits, c) else: sum= sum+(string.index(_digits, c)/pow(10.0, dPlace)) dPlace= dPlace+1 else: if c in decChars: if dPlace==0: dPlace= 1 else: raise ValueError, 'More than one decimal point' else: if c not in sepChars: raise ValueError, 'Invalid character in number' return sum