-
Notifications
You must be signed in to change notification settings - Fork 1
Testing Primitive Operators with Python Unittest
From Dan Debrunner:
Here's an example of a test using Python unittest for the FTPReader operator.
It tests that FTPReader scans at least 18 files from the FTP site and that if it's a file each file name ends in .zip which matches the current contents of that site.
It runs successfully with standalone and Streaming Analytics service. For distributed and Streaming Analytics service the tester also automatically verifies that no PEs were restarted during the test, to catch errors where the correct result might be returned but errors caused automatic PE restarts.
import unittest
from streamsx.topology.topology import *
from streamsx.topology.tester import Tester
import streamsx.spl.op as op
import streamsx.spl.toolkit as tk
class TestFTP(unittest.TestCase):
""" Test invocations of FTP operators with standalone """
def setUp(self):
Tester.setup_standalone(self)
def _add_toolkits(self, topo):
tk.add_toolkit(topo, '../tk')
tk.add_toolkit(topo, '/home/streamsadmin/toolkits/com.ibm.streamsx.inet')
def test_scan(self):
""" Test scanning of a standard site. """
topo = Topology()
self._add_toolkits(topo)
scan = op.Source(topo, "ftptest::FTPScanTest", 'ftptest::FileName_t')
tester = Tester(topo)
tester.tuple_count(scan.stream, 18, exact=False)
tester.tuple_check(scan.stream, lambda fn : fn['fileName'].endswith('.zip') if fn['isFile'] else True)
tester.test(self.test_ctxtype, self.test_config)
class TestFTPCloud(TestFTP):
""" Test invocations of FTP operators with streaming analytics """
def setUp(self):
Tester.setup_streaming_analytics(self, force_remote_build=True)
It's not the same as the script example above, but is based upon it. It's not the same because the above test (I think) downloads a 1MB file, but then checks the output (which seems to be modified) is at least 2,000,000 bytes, so it seems like it's not explicitly checking the functionality of the FTPReader operator. It wouldn't be too hard to add an additional test case to this Python test to ensure that the data read from the file was the actual expected size. (I'm also using an older version of the inet toolkit)
Matching SPL file, a modified subset of the original SPL file, focusing on the FTPReader operator.
namespace ftptest;
use com.ibm.streamsx.inet.ftp::*;
type FileName_t = rstring fileName, uint64 size, rstring date, rstring user, boolean isFile, uint32 transferCount, uint32 failureCount, uint64 bytesTransferred, float64 speed;
type Error_t = rstring errorText, int32 error, uint32 transferCount, uint32 failureCount, uint64 bytesTransferred;
public composite FTPScanTest(output FilenameStream) {
param
expression<Protocol> $protocol : (Protocol)getSubmissionTimeValue("protocol", "ftp");
expression<rstring> $host : getSubmissionTimeValue("host", "speedtest.tele2.net");
expression<rstring> $path : getSubmissionTimeValue("path", "/");
expression<rstring> $username : getSubmissionTimeValue("username", "anonymous");
expression<rstring> $password : getSubmissionTimeValue("password", "anon@localhost");
expression<rstring> $fileToTransfer : getSubmissionTimeValue("fileToTransfer", "1MB.zip");
expression<boolean> $verbosity : (boolean)getSubmissionTimeValue("verbosity", "false");
graph
stream<int32 a> TriggerStream = Beacon() {
param
iterations: 1u;
}
//scan the remote directory
(
stream<FileName_t> FilenameStream as OUT
) = FTPReader(TriggerStream) {
param
protocol : $protocol;
isDirReader : true;
host : $host;
path : $path;
username : $username;
password : $password;
useEPSV : false;
curlVerbose : $verbosity;
output
OUT :
fileName = FileName(),
size = FileSize(),
date = FileDate(),
user = FileUser(),
isFile = IsFile(),
transferCount = 0u, // TransferCount(),
failureCount = 0u, // TransferFailureCount(),
bytesTransferred = NoBytesTransferred(),
speed = TransferSpeed();
}
}
It would be run with:
# All tests
python3 -m unittest test_ftp.py
# A single test
python3 -m unittest test_ftp.TestFTPCloud.test_scan