#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Auxiliary script used to allocate memory on guests.
@copyright: 2008-2009 Red Hat Inc.
@author: Jiri Zupka (jzupka@redhat.com)
"""
import os, array, sys, random, copy, tempfile, datetime, math
PAGE_SIZE = 4096 # machine page size
TMPFS_OVERHEAD = 0.0022 # overhead on 1MB of write data
class MemFill(object):
"""
Fills guest memory according to certain patterns.
"""
def __init__(self, mem, static_value, random_key):
"""
Constructor of MemFill class.
@param mem: Amount of test memory in MB.
@param random_key: Seed of random series used for fill up memory.
@param static_value: Value used to fill all memory.
"""
if (static_value < 0 or static_value > 255):
print ("FAIL: Initialization static value"
"can be only in range (0..255)")
return
self.tmpdp = tempfile.mkdtemp()
ret_code = os.system("mount -o size=%dM tmpfs %s -t tmpfs" %
((mem+math.ceil(mem*TMPFS_OVERHEAD)),
self.tmpdp))
if ret_code != 0:
if os.getuid() != 0:
print ("FAIL: Unable to mount tmpfs "
"(likely cause: you are not root)")
else:
print "FAIL: Unable to mount tmpfs"
else:
self.f = tempfile.TemporaryFile(prefix='mem', dir=self.tmpdp)
self.allocate_by = 'L'
self.npages = ((mem * 1024 * 1024) / PAGE_SIZE)
self.random_key = random_key
self.static_value = static_value
print "PASS: Initialization"
def __del__(self):
if os.path.ismount(self.tmpdp):
self.f.close()
os.system("umount %s" % (self.tmpdp))
def compare_page(self, original, inmem):
"""
Compare pages of memory and print the differences found.
@param original: Data that was expected to be in memory.
@param inmem: Data in memory.
"""
for ip in range(PAGE_SIZE / original.itemsize):
if (not original[ip] == inmem[ip]): # find which item is wrong
originalp = array.array("B")
inmemp = array.array("B")
originalp.fromstring(original[ip:ip+1].tostring())
inmemp.fromstring(inmem[ip:ip+1].tostring())
for ib in range(len(originalp)): # find wrong byte in item
if not (originalp[ib] == inmemp[ib]):
position = (self.f.tell() - PAGE_SIZE + ip *
original.itemsize + ib)
print ("Mem error on position %d wanted 0x%Lx and is "
"0x%Lx" % (position, originalp[ib], inmemp[ib]))
def value_page(self, value):
"""
Create page filled by value.
@param value: String we want to fill the page with.
@return: return array of bytes size PAGE_SIZE.
"""
a = array.array("B")
for i in range((PAGE_SIZE / a.itemsize)):
try:
a.append(value)
except:
print "FAIL: Value can be only in range (0..255)"
return a
def random_page(self, seed):
"""
Create page filled by static random series.
@param seed: Seed of random series.
@return: Static random array series.
"""
random.seed(seed)
a = array.array(self.allocate_by)
for i in range(PAGE_SIZE / a.itemsize):
a.append(random.randrange(0, sys.maxint))
return a
def value_fill(self, value=None):
"""
Fill memory page by page, with value generated with value_page.
@param value: Parameter to be passed to value_page. None to just use
what's on the attribute static_value.
"""
self.f.seek(0)
if value is None:
value = self.static_value
page = self.value_page(value)
for pages in range(self.npages):
page.tofile(self.f)
print "PASS: Mem value fill"
def value_check(self, value=None):
"""
Check memory to see if data is correct.
@param value: Parameter to be passed to value_page. None to just use
what's on the attribute static_value.
@return: if data in memory is correct return PASS
else print some wrong data and return FAIL
"""
self.f.seek(0)
e = 2
failure = False
if value is None:
value = self.static_value
page = self.value_page(value)
for pages in range(self.npages):
pf = array.array("B")
pf.fromfile(self.f, PAGE_SIZE / pf.itemsize)
if not (page == pf):
failure = True
self.compare_page(page, pf)
e = e - 1
if e == 0:
break
if failure:
print "FAIL: value verification"
else:
print "PASS: value verification"
def static_random_fill(self, n_bytes_on_end=PAGE_SIZE):
"""
Fill memory by page with static random series with added special value
on random place in pages.
@param n_bytes_on_end: how many bytes on the end of page can be changed.
@return: PASS.
"""
self.f.seek(0)
page = self.random_page(self.random_key)
random.seed(self.random_key)
p = copy.copy(page)
t_start = datetime.datetime.now()
for pages in range(self.npages):
rand = random.randint(((PAGE_SIZE / page.itemsize) - 1) -
(n_bytes_on_end / page.itemsize),
(PAGE_SIZE/page.itemsize) - 1)
p[rand] = pages
p.tofile(self.f)
p[rand] = page[rand]
t_end = datetime.datetime.now()
delta = t_end - t_start
milisec = delta.microseconds / 1e3 + delta.seconds * 1e3
print "PASS: filling duration = %Ld ms" % milisec
def static_random_verify(self, n_bytes_on_end=PAGE_SIZE):
"""
Check memory to see if it contains correct contents.
@return: if data in memory is correct return PASS
else print some wrong data and return FAIL.
"""
self.f.seek(0)
e = 2
page = self.random_page(self.random_key)
random.seed(self.random_key)
p = copy.copy(page)
failure = False
for pages in range(self.npages):
rand = random.randint(((PAGE_SIZE/page.itemsize) - 1) -
(n_bytes_on_end/page.itemsize),
(PAGE_SIZE/page.itemsize) - 1)
p[rand] = pages
pf = array.array(self.allocate_by)
pf.fromfile(self.f, PAGE_SIZE / pf.itemsize)
if not (p == pf):
failure = True
self.compare_page(p, pf)
e = e - 1
if e == 0:
break
p[rand] = page[rand]
if failure:
print "FAIL: Random series verification"
else:
print "PASS: Random series verification"
def die():
"""
Quit allocator.
"""
exit(0)
def main():
"""
Main (infinite) loop of allocator.
"""
print "PASS: Start"
end = False
while not end:
str = raw_input()
exec str
if __name__ == "__main__":
main()