diff --git a/.gitignore b/.gitignore index 2bef6a3..9888632 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ coverage.xml dist/ .venv .dir-locals.el +private_configs/ +logs/ diff --git a/README.rst b/README.rst index 944a013..f3dbe28 100644 --- a/README.rst +++ b/README.rst @@ -1,22 +1,49 @@ -Scripts for generating and loading test xAPI events -*************************************************** - -|pypi-badge| |ci-badge| |codecov-badge| |doc-badge| |pyversions-badge| -|license-badge| |status-badge| - +Scripts for generating Aspects xAPI events +****************************************** Purpose ======= +This package generates a variety of test data used for integration and +performance testing of Open edX Aspects. Currently it populates the following +datasets: -Some test scripts to help make apples-to-apples comparisons of different -database backends for xAPI events. Supports direct database connections to -ClickHouse, and batch loading data to the Ralph Learning Record Store with the -ClickHouse backend. It also can create gzipped CSV files for bulk import to -other databases. +- xAPI statements, simulating those generated by event-routing-backends +- Course and learner data, simulating that generated by event-sink-clickhouse -xAPI events generated match the specifications of the Open edX +The xAPI events generated match the current specifications of the Open edX event-routing-backends package, but are not yet maintained to advance alongside -them. +them so may be expected to fall out of sync over time. Almost all current +statements are simulated, but statements that not yet used in Aspects reporting +have been skipped. + +Features +======== +Once an appropriate database has been created using Aspects, data can be +generated in the following ways: + +Ralph to ClickHouse +------------------- +Useful for testing configuration, integration, and permissions, this uses batch +POSTs to Ralph for xAPI statements, but still writes directly to ClickHouse for +course and actor data. This is the slowest method, but exercises the largest +surface area of the project. + +Direct to ClickHouse +-------------------- +Useful for getting a medium to large amount of data into the database to test +configuration and view reports. xAPI statements are batched, other data is +currently inserted one row at a time. + +CSV files +--------- +Useful for creating datasets that can be reused for checking performance +changes with the exact same data, and for extremely large tests. The files can +be generated locally or on any service supported by smart_open. They can then +optionally be imported to ClickHouse if written locally or to S3. They can also +be directly imported from S3 to ClickHouse at any time using the +``load-db-from-s3`` subcommand. This is by far the fastest method for large +scale tests. + Getting Started =============== @@ -24,13 +51,187 @@ Getting Started Usage ----- -Details of how to run the current version of the script can be found by -executing: +A configuration file is required to run a test. If no file is given, a small +test will be run using the `default_config.yaml` included in the project: :: - ❯ xapi-db-load --help + ❯ xapi-db-load load-db + +To specify a config file: + +:: + + ❯ xapi-db-load load-db --config_file private_configs/my_huge_test.yaml + +There is also a sub-command for just performing a load of previously generated +CSV data from S3: +:: + + ❯ xapi-db-load load-db-from-s3 --config_file private_configs/my_s3_test.yaml + + +Configuration Format +-------------------- +There are a number of different configuration options for tuning the output. +In addition to the documentation below, there are example settings files to +review in the ``example_configs`` directory. + +Common Settings +^^^^^^^^^^^^^^^ +These settings apply to all backends, and determine the size and makeup of the +test:: + + # Location where timing logs will be saved + log_dir: logs + + # xAPI statements will be generated in batches, the total number of + # statements is ``num_batches * batch_size``. The batch size is the number + # of statements sent to the backend (Ralph POST, ClickHouse insert, etc.) + num_batches: 3 + batch_size: 100 + + # Overall start and end date for the entire run. All xAPI statements + # will fall within these dates. Different courses will have different start + # and end dates between these days, based on course_length_days below. + start_date: 2014-01-01 + end_date: 2023-11-27 + + # All courses will be this long, they will be fit between start_date and + # end_date, therefore this must be less than end_date - start_date days. + course_length_days: 120 + + # The number of organizations, courses will be evenly spread among these + num_organizations: 3 + + # The number of learners to create, random subsets of these will be + # "registered" for each course and have statements generated for them + # between their registration date and the end of the course + num_actors: 10 + + # How many of each size course to create. The sum of these is the total + # number of courses created for the test. The keys are arbitrary, you can + # name them whatever you like and have as many or few sizes as you like. + # The keys must exactly match the definitions in course_size_makeup below. + num_course_sizes: + small: 1 + medium: 1 + ... + + # Course type configurations, how many of each type of object are created + # for each course of this size. "actors" must be less than or equal to + # "num_actors". Keys here must exactly match the keys in num_course_sizes. + course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + ... + +CSV Backend, Local Files +^^^^^^^^^^^^^^^^^^^^^^^^ +Generates gzipped CSV files to a local directory:: + + backend: csv_file + csv_output_destination: logs/ + +CSV Backend, S3 Compatible Destination +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Generates gzipped CSV files to remote location:: + + backend: csv_file + # This can be anything smart-open can handle (ex. a local directory or + # an S3 bucket etc.) but importing to ClickHouse using this tool only + # supports S3 or compatible services like MinIO right now. + # Note that this *must* be an s3:// link, https links will not work + # https://pypi.org/project/smart-open/ + csv_output_destination: s3://openedx-aspects-loadtest/logs/large_test/ + + # These settings are shared with the ClickHouse backend + s3_key: + s3_secret: + +CSV Backend, S3 Compatible Destination, Load to ClickHouse +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Generates gzipped CSV files to a remote location, then automatically loads +them to ClickHouse:: + + backend: csv_file + # csv_output_destination can be anything smart_open can handle, a local + # directory or an S3 bucket etc., but importing to ClickHouse using this + # tool only supports S3 or compatible services (ex: MinIO) right now + # https://pypi.org/project/smart-open/ + csv_output_destination: s3://openedx-aspects-loadtest/logs/large_test/ + csv_load_from_s3_after: true + + # Note that this *must* be an https link, s3:// links will not work, + # this must point to the same location as csv_output_destination. + s3_source_location: https://openedx-aspects-loadtest.s3.amazonaws.com/logs/large_test/ + + # This also requires all of the ClickHouse backend variables! + +ClickHouse Backend +^^^^^^^^^^^^^^^^^^ +Backend is only necessary if you are writing directly to ClickHouse, for +integrations with Ralph or CSV, use their ``backend`` instead:: + + backend: clickhouse + +Variables necessary to connect to ClickHouse, whether directly, through Ralph, or +as part of loading CSV files:: + + # ClickHouse connection variables + db_host: localhost + # db_port is also used to determine the "secure" parameter. If the port + # ends in 443 or 440, the "secure" flag will be set on the connection. + db_port: 8443 + db_username: ch_admin + db_password: secret + + # Schema name for the xAPI schema + db_name: xapi + + # Schema name for the event sink schema + db_event_sink_name: event_sink + + # These S3 settings are shared with the CSV backend, but passed to + # ClickHouse when loading files from S3 + s3_key: <...> + s3_secret: <...> + +Ralph / ClickHouse Backend +^^^^^^^^^^^^^^^^^^^^^^^^^^ +Variables necessary to send xAPI statements via Ralph:: + + backend: ralph_clickhouse + lrs_url: http://ralph.tutor-nightly-local.orb.local/xAPI/statements + lrs_username: ralph + lrs_password: secret + + # This also requires all of the ClickHouse backend variables! + +Load from S3 configuration +^^^^^^^^^^^^^^^^^^^^^^^^^^ +Variables necessary to run ``xapi-db-load load-db-from-s3``, which skips the +event generation process and just loads pre-existing CSV files from S3:: + + # Note that this must be an https link, s3:// links will not work + s3_source_location: https://openedx-aspects-loadtest.s3.amazonaws.com/logs/large_test/ + + # This also requires all of the ClickHouse backend variables! Developing ---------- @@ -162,29 +363,3 @@ Reporting Security Issues ************************* Please do not report security issues in public. Please email security@openedx.org. - -.. |pypi-badge| image:: https://img.shields.io/pypi/v/xapi-db-load.svg - :target: https://pypi.python.org/pypi/xapi-db-load/ - :alt: PyPI - -.. |ci-badge| image:: https://github.com/openedx/xapi-db-load/workflows/Python%20CI/badge.svg?branch=main - :target: https://github.com/openedx/xapi-db-load/actions - :alt: CI - -.. |codecov-badge| image:: https://codecov.io/github/openedx/xapi-db-load/coverage.svg?branch=main - :target: https://codecov.io/github/openedx/xapi-db-load?branch=main - :alt: Codecov - -.. |doc-badge| image:: https://readthedocs.org/projects/xapi-db-load/badge/?version=latest - :target: https://xapi-db-load.readthedocs.io/en/latest/ - :alt: Documentation - -.. |pyversions-badge| image:: https://img.shields.io/pypi/pyversions/xapi-db-load.svg - :target: https://pypi.python.org/pypi/xapi-db-load/ - :alt: Supported Python versions - -.. |license-badge| image:: https://img.shields.io/github/license/openedx/xapi-db-load.svg - :target: https://github.com/openedx/xapi-db-load/blob/main/LICENSE.txt - :alt: License - -.. |status-badge| image:: https://img.shields.io/badge/Status-Experimental-yellow diff --git a/default_config.yaml b/default_config.yaml new file mode 100644 index 0000000..0944577 --- /dev/null +++ b/default_config.yaml @@ -0,0 +1,100 @@ +# CSV backend configuration +# ######################### +backend: csv_file +# This can be anything smart_open can handle, a local directory or +# an S3 bucket, etc., but importing to ClickHouse only supports S3 right now +# https://pypi.org/project/smart-open/ +csv_output_destination: logs/ # s3://openedx-aspects-loadtest/logs/large_test/ +csv_load_from_s3_after: false + +# ClickHouse Backend configuration +# ################################ +# backend: clickhouse +# db_host: localhost +# db_port: 8443 +# db_name: xapi_lt +# db_event_sink_name: event_sink +# db_username: ch_admin +# db_password: +# s3_key: +# s3_secret: + + +# Ralph / ClickHouse backend configuration +# ######################################## +# backend: ralph_clickhouse +# db_host: localhost +# db_port: null +# db_name: xapi +# db_username: ch_admin +# db_password: 7NRe69D4zWWT0rf2G7gWa7RB +# lrs_url: http://ralph.tutor-nightly-local.orb.local/xAPI/statements +# lrs_username: ralph +# lrs_password: sdtiqjqwixhzcboqzbiryrulzcpvfmsfvqqw + +# Load from S3 configuration +# ########################## +# s3_source_location: https://openedx-aspects-loadtest.s3.amazonaws.com/logs/large_test/ + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/clickhouse_example.yaml b/example_configs/clickhouse_example.yaml new file mode 100644 index 0000000..70884d5 --- /dev/null +++ b/example_configs/clickhouse_example.yaml @@ -0,0 +1,74 @@ +# ClickHouse Backend configuration +# ################################ +backend: clickhouse +db_host: localhost +db_port: 8443 +db_name: xapi_lt +db_event_sink_name: event_sink +db_username: ch_admin +db_password: test +s3_key: ... +s3_secret: ... + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/large_csv_load.yaml b/example_configs/large_csv_load.yaml new file mode 100644 index 0000000..1723d81 --- /dev/null +++ b/example_configs/large_csv_load.yaml @@ -0,0 +1,81 @@ +# Large data load +# ######################### +# Took about 30 hrs to run- 24 to generate the CSVs, 5.5 to import to ClickHouse +# 1 billion statements +# 2 million learners +# 1200 courses +# 10 orgs + +backend: csv_file +# The next two lines point to the same place, but each needs a different URL +# format. +csv_output_destination: s3://openedx-aspects-loadtest/logs/large_test/ +s3_source_location: https://openedx-aspects-loadtest.s3.amazonaws.com/logs/large_test/ +csv_load_from_s3_after: true + +db_host: localhost +db_port: 8443 +db_name: xapi_lt +db_event_sink_name: event_sink +db_username: ch_admin +db_password: test +s3_key: ... +s3_secret: ... + +# Run options +log_dir: logs +num_batches: 100000 +batch_size: 10000 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 10 +num_actors: 2000000 + +# The sum of these is the total number of courses created for the test +num_course_sizes: + small: 100 + medium: 400 + large: 480 + huge: 120 + +course_size_makeup: + small: + actors: 30 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 500 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 5000 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 20000 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/local_csv.yaml b/example_configs/local_csv.yaml new file mode 100644 index 0000000..031a295 --- /dev/null +++ b/example_configs/local_csv.yaml @@ -0,0 +1,67 @@ +# CSV backend configuration +# ######################### +backend: csv_file +csv_output_destination: logs + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/ralph_clickhouse_example.yaml b/example_configs/ralph_clickhouse_example.yaml new file mode 100644 index 0000000..70e2172 --- /dev/null +++ b/example_configs/ralph_clickhouse_example.yaml @@ -0,0 +1,74 @@ +# Ralph / ClickHouse backend configuration +# ######################################## +backend: ralph_clickhouse +db_host: localhost +db_port: null +db_name: xapi +db_username: ch_admin +db_password: test +lrs_url: http://ralph.tutor-nightly-local.orb.local/xAPI/statements +lrs_username: ralph +lrs_password: test + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/s3_csv.yaml b/example_configs/s3_csv.yaml new file mode 100644 index 0000000..9e11629 --- /dev/null +++ b/example_configs/s3_csv.yaml @@ -0,0 +1,71 @@ +# CSV backend configuration +# ######################### +backend: csv_file +# This can be anything smart_open can handle, a local directory or +# an S3 bucket, etc., but importing to ClickHouse only supports S3 right now +# https://pypi.org/project/smart-open/ +csv_output_destination: s3://openedx-aspects-loadtest/logs/large_test/ +csv_load_from_s3_after: false + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/example_configs/s3_csv_clickhouse.yaml b/example_configs/s3_csv_clickhouse.yaml new file mode 100644 index 0000000..8da1101 --- /dev/null +++ b/example_configs/s3_csv_clickhouse.yaml @@ -0,0 +1,86 @@ +# CSV backend configuration +# ######################### +backend: csv_file +# This can be anything smart_open can handle, a local directory or +# an S3 bucket, etc., but importing to ClickHouse only supports S3 right now +# https://pypi.org/project/smart-open/ + +# The next two lines point to the same place, but each needs a different URL +# format. +csv_output_destination: s3://openedx-aspects-loadtest/logs/large_test/ +s3_source_location: https://openedx-aspects-loadtest.s3.amazonaws.com/logs/large_test/ +csv_load_from_s3_after: true + +# ClickHouse Backend configuration +# ################################ +db_host: localhost +db_port: 8443 +db_name: xapi_lt +db_event_sink_name: event_sink +db_username: ch_admin +db_password: test +s3_key: ... +s3_secret: ... + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/logs/get_times.py b/logs/get_times.py deleted file mode 100644 index 89a4a43..0000000 --- a/logs/get_times.py +++ /dev/null @@ -1,46 +0,0 @@ -with open("ralph_clickhouse_100M_2/ralph_clickhouse_100m_10kbatch.txt", "r") as f: - print( - """# Automatically generated from get_times.py - - -import datetime - -a = ( - """ - ) - - # count = 0 - - # First few timestamps are startup related - for line in f.readlines()[4:]: - if line.startswith("[('"): - # count += 1 - - # Every 10th and 11th timestamp are query timing related - # if count % 11 == 0 or count % 12 == 0: - # continue - print(line.strip() + ",") - -print( - """ -) - -times = [x[0][1] for x in a] - -prev_time = times[1] - -durations = [] -count = 1 -for t in times[2:]: - count += 1 - - if count % 11 == 0 or count % 12 == 0: - prev_time = t - continue - - durations.append(str((t - prev_time).seconds)) - prev_time = t - -print("\\n".join(durations)) -""" -) diff --git a/logs/parse_log_for_query_times.py b/logs/parse_log_for_query_times.py deleted file mode 100644 index 3631446..0000000 --- a/logs/parse_log_for_query_times.py +++ /dev/null @@ -1,80 +0,0 @@ -queries = { - "Count of enrollment events for course": [], - "Count of total enrollment events for org": [], - # "for this learner" for Clickhouse, "for this actor" for some others... - "Count of enrollments for this learner": [], - "Count of enrollments for this course - count of unenrollments, last 30 days": [], - "Count of enrollments for this course - count of unenrollments, all time": [], - "Count of enrollments for all courses - count of unenrollments, last 5 minutes": [], -} - -# Just used for testing output -log = """2022-12-09 16:59:47.500233 -8890 of 10000 -2022-12-09 17:01:11.173771 -8900 of 10000 -2022-12-09 17:02:37.400290 -Count of enrollment events for course http://localhost:18000/course/course-v1:salsaX+DemoX+81fcb08f-218c-454f-ad33-fe7c6e784ecb -28 -Completed in: 0.014149 -================================= -Count of total enrollment events for org chipX -190824 -Completed in: 122.453695 -================================= -Count of enrollments for this actor c29df261-45b5-4f32-85cc-56c54297f315 -0 -Completed in: 114.205501 -================================= -2022-12-09 17:06:44.088138 -Collection count: -89010000 -8910 of 10000 -2022-12-09 17:10:25.374361 -8920 of 10000 -2022-12-09 17:12:23.670774 -""".splitlines() - - -def go(): - # fname = "citus_100M_columnar_cluster_no_partition.txt" - # fname = "clickhouse_100M.txt" - # fname = "mongo_100M_4indexes.txt" - # fname = "ralph_mongo_100M.txt" - fname = "ralph_100M_json_obj_no_buffer.txt" - with open(fname, "r") as logf: - log = logf.readlines() - x = -1 - for line in log: - print(line) - x += 1 - for start in queries: - # print(start) - if line.startswith(start): - queries[start].append(x) - - output = {} - for start in queries: - output[start] = [] - print(f"{start}: {len(queries[start])}") - - # 3 lines after the start phrase is found is the time output line - for base_line in queries[start]: - # 'Completed in: 0.006565\n' - - offset = 3 - time_str = log[base_line + offset].strip() - time = time_str[13:] - - output[start].append(time) - # print(f"Base line: {log[base_line]}") - # print(f"Base + 2: {log[base_line + 2]}") - # print("-------") - # break - - for start in output: - print(start) - print("\n".join(output[start])) - - -go() diff --git a/private_configs/.gitkeep b/private_configs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/requirements/base.in b/requirements/base.in index 42c7c8b..f1d545d 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -3,4 +3,6 @@ click clickhouse-connect>0.5<0.7 +pyyaml requests +smart_open[s3] diff --git a/requirements/base.txt b/requirements/base.txt index 1b3587c..46eb458 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -4,6 +4,12 @@ # # make upgrade # +boto3==1.33.12 + # via smart-open +botocore==1.33.12 + # via + # boto3 + # s3transfer certifi==2023.11.17 # via # clickhouse-connect @@ -16,14 +22,29 @@ clickhouse-connect==0.6.22 # via -r requirements/base.in idna==3.6 # via requests +jmespath==1.0.1 + # via + # boto3 + # botocore lz4==4.3.2 # via clickhouse-connect +python-dateutil==2.8.2 + # via botocore pytz==2023.3.post1 # via clickhouse-connect +pyyaml==6.0.1 + # via -r requirements/base.in requests==2.31.0 # via -r requirements/base.in -urllib3==2.1.0 +s3transfer==0.8.2 + # via boto3 +six==1.16.0 + # via python-dateutil +smart-open[s3]==6.4.0 + # via -r requirements/base.in +urllib3==1.26.18 # via + # botocore # clickhouse-connect # requests zstandard==0.22.0 diff --git a/requirements/dev.txt b/requirements/dev.txt index 195e7cb..b20ae9f 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -4,15 +4,20 @@ # # make upgrade # -annotated-types==0.6.0 - # via - # -r requirements/quality.txt - # pydantic astroid==3.0.1 # via # -r requirements/quality.txt # pylint # pylint-celery +boto3==1.33.12 + # via + # -r requirements/quality.txt + # smart-open +botocore==1.33.12 + # via + # -r requirements/quality.txt + # boto3 + # s3transfer build==1.0.3 # via # -r requirements/pip-tools.txt @@ -21,19 +26,11 @@ cachetools==5.3.2 # via # -r requirements/ci.txt # tox -cerberus==1.3.5 - # via - # -r requirements/quality.txt - # plette certifi==2023.11.17 # via # -r requirements/quality.txt # clickhouse-connect # requests -cffi==1.16.0 - # via - # -r requirements/quality.txt - # cryptography chardet==5.2.0 # via # -r requirements/ci.txt @@ -70,10 +67,6 @@ coverage[toml]==7.3.2 # -r requirements/quality.txt # coverage # pytest-cov -cryptography==41.0.7 - # via - # -r requirements/quality.txt - # secretstorage diff-cover==8.0.1 # via -r requirements/dev.in dill==0.3.7 @@ -83,13 +76,7 @@ dill==0.3.7 distlib==0.3.7 # via # -r requirements/ci.txt - # -r requirements/quality.txt - # requirementslib # virtualenv -docopt==0.6.2 - # via - # -r requirements/quality.txt - # pipreqs docutils==0.20.1 # via # -r requirements/quality.txt @@ -124,7 +111,7 @@ iniconfig==2.0.0 # via # -r requirements/quality.txt # pytest -isort==5.13.0 +isort==5.13.1 # via # -r requirements/quality.txt # pylint @@ -132,16 +119,16 @@ jaraco-classes==3.3.0 # via # -r requirements/quality.txt # keyring -jeepney==0.8.0 - # via - # -r requirements/quality.txt - # keyring - # secretstorage jinja2==3.1.2 # via # -r requirements/quality.txt # code-annotations # diff-cover +jmespath==1.0.1 + # via + # -r requirements/quality.txt + # boto3 + # botocore keyring==24.3.0 # via # -r requirements/quality.txt @@ -187,20 +174,8 @@ pbr==6.0.0 # via # -r requirements/quality.txt # stevedore -pep517==0.13.1 - # via - # -r requirements/quality.txt - # requirementslib -pip-api==0.0.30 - # via - # -r requirements/quality.txt - # isort pip-tools==7.3.0 # via -r requirements/pip-tools.txt -pipreqs==0.4.13 - # via - # -r requirements/quality.txt - # isort pkginfo==1.9.6 # via # -r requirements/quality.txt @@ -210,14 +185,8 @@ platformdirs==4.1.0 # -r requirements/ci.txt # -r requirements/quality.txt # pylint - # requirementslib # tox # virtualenv -plette[validation]==0.4.4 - # via - # -r requirements/quality.txt - # plette - # requirementslib pluggy==1.3.0 # via # -r requirements/ci.txt @@ -227,18 +196,6 @@ pluggy==1.3.0 # tox pycodestyle==2.11.1 # via -r requirements/quality.txt -pycparser==2.21 - # via - # -r requirements/quality.txt - # cffi -pydantic==2.5.2 - # via - # -r requirements/quality.txt - # requirementslib -pydantic-core==2.14.5 - # via - # -r requirements/quality.txt - # pydantic pydocstyle==6.3.0 # via -r requirements/quality.txt pygments==2.17.2 @@ -247,7 +204,7 @@ pygments==2.17.2 # diff-cover # readme-renderer # rich -pylint==3.0.2 +pylint==3.0.3 # via # -r requirements/quality.txt # edx-lint @@ -281,6 +238,10 @@ pytest==7.4.3 # pytest-cov pytest-cov==4.1.0 # via -r requirements/quality.txt +python-dateutil==2.8.2 + # via + # -r requirements/quality.txt + # botocore python-slugify==8.0.1 # via # -r requirements/quality.txt @@ -301,17 +262,11 @@ requests==2.31.0 # via # -r requirements/quality.txt # requests-toolbelt - # requirementslib # twine - # yarg requests-toolbelt==1.0.0 # via # -r requirements/quality.txt # twine -requirementslib==3.0.0 - # via - # -r requirements/quality.txt - # isort rfc3986==2.0.0 # via # -r requirements/quality.txt @@ -320,14 +275,19 @@ rich==13.7.0 # via # -r requirements/quality.txt # twine -secretstorage==3.3.3 +s3transfer==0.8.2 # via # -r requirements/quality.txt - # keyring + # boto3 six==1.16.0 # via # -r requirements/quality.txt # edx-lint + # python-dateutil +smart-open[s3]==6.4.0 + # via + # -r requirements/quality.txt + # smart-open snowballstemmer==2.2.0 # via # -r requirements/quality.txt @@ -347,7 +307,6 @@ tomli==2.0.1 # -r requirements/quality.txt # build # coverage - # pep517 # pip-tools # pylint # pyproject-api @@ -357,9 +316,7 @@ tomli==2.0.1 tomlkit==0.12.3 # via # -r requirements/quality.txt - # plette # pylint - # requirementslib tox==4.11.4 # via -r requirements/ci.txt twine==4.0.2 @@ -367,15 +324,13 @@ twine==4.0.2 typing-extensions==4.9.0 # via # -r requirements/quality.txt - # annotated-types # astroid - # pydantic - # pydantic-core # pylint # rich -urllib3==2.1.0 +urllib3==1.26.18 # via # -r requirements/quality.txt + # botocore # clickhouse-connect # requests # twine @@ -387,10 +342,6 @@ wheel==0.42.0 # via # -r requirements/pip-tools.txt # pip-tools -yarg==0.1.9 - # via - # -r requirements/quality.txt - # pipreqs zipp==3.17.0 # via # -r requirements/pip-tools.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 42cae72..5bf5495 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -14,6 +14,15 @@ babel==2.13.1 # sphinx beautifulsoup4==4.12.2 # via pydata-sphinx-theme +boto3==1.33.12 + # via + # -r requirements/test.txt + # smart-open +botocore==1.33.12 + # via + # -r requirements/test.txt + # boto3 + # s3transfer build==1.0.3 # via -r requirements/doc.in certifi==2023.11.17 @@ -21,8 +30,6 @@ certifi==2023.11.17 # -r requirements/test.txt # clickhouse-connect # requests -cffi==1.16.0 - # via cryptography charset-normalizer==3.3.2 # via # -r requirements/test.txt @@ -36,8 +43,6 @@ coverage[toml]==7.3.2 # -r requirements/test.txt # coverage # pytest-cov -cryptography==41.0.7 - # via secretstorage docutils==0.19 # via # pydata-sphinx-theme @@ -67,12 +72,13 @@ iniconfig==2.0.0 # pytest jaraco-classes==3.3.0 # via keyring -jeepney==0.8.0 - # via - # keyring - # secretstorage jinja2==3.1.2 # via sphinx +jmespath==1.0.1 + # via + # -r requirements/test.txt + # boto3 + # botocore keyring==24.3.0 # via twine lz4==4.3.2 @@ -102,8 +108,6 @@ pluggy==1.3.0 # via # -r requirements/test.txt # pytest -pycparser==2.21 - # via cffi pydata-sphinx-theme==0.14.4 # via sphinx-book-theme pygments==2.17.2 @@ -121,11 +125,17 @@ pytest==7.4.3 # pytest-cov pytest-cov==4.1.0 # via -r requirements/test.txt +python-dateutil==2.8.2 + # via + # -r requirements/test.txt + # botocore pytz==2023.3.post1 # via # -r requirements/test.txt # babel # clickhouse-connect +pyyaml==6.0.1 + # via -r requirements/test.txt readme-renderer==42.0 # via twine requests==2.31.0 @@ -140,8 +150,18 @@ rfc3986==2.0.0 # via twine rich==13.7.0 # via twine -secretstorage==3.3.3 - # via keyring +s3transfer==0.8.2 + # via + # -r requirements/test.txt + # boto3 +six==1.16.0 + # via + # -r requirements/test.txt + # python-dateutil +smart-open[s3]==6.4.0 + # via + # -r requirements/test.txt + # smart-open snowballstemmer==2.2.0 # via sphinx soupsieve==2.5 @@ -178,9 +198,10 @@ typing-extensions==4.9.0 # via # pydata-sphinx-theme # rich -urllib3==2.1.0 +urllib3==1.26.18 # via # -r requirements/test.txt + # botocore # clickhouse-connect # requests # twine diff --git a/requirements/quality.txt b/requirements/quality.txt index 5a27f1a..bee1a65 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -4,21 +4,24 @@ # # make upgrade # -annotated-types==0.6.0 - # via pydantic astroid==3.0.1 # via # pylint # pylint-celery -cerberus==1.3.5 - # via plette +boto3==1.33.12 + # via + # -r requirements/test.txt + # smart-open +botocore==1.33.12 + # via + # -r requirements/test.txt + # boto3 + # s3transfer certifi==2023.11.17 # via # -r requirements/test.txt # clickhouse-connect # requests -cffi==1.16.0 - # via cryptography charset-normalizer==3.3.2 # via # -r requirements/test.txt @@ -40,14 +43,8 @@ coverage[toml]==7.3.2 # -r requirements/test.txt # coverage # pytest-cov -cryptography==41.0.7 - # via secretstorage dill==0.3.7 # via pylint -distlib==0.3.7 - # via requirementslib -docopt==0.6.2 - # via pipreqs docutils==0.20.1 # via readme-renderer edx-lint==5.3.6 @@ -70,18 +67,19 @@ iniconfig==2.0.0 # via # -r requirements/test.txt # pytest -isort==5.13.0 +isort==5.13.1 # via # -r requirements/quality.in # pylint jaraco-classes==3.3.0 # via keyring -jeepney==0.8.0 - # via - # keyring - # secretstorage jinja2==3.1.2 # via code-annotations +jmespath==1.0.1 + # via + # -r requirements/test.txt + # boto3 + # botocore keyring==24.3.0 # via twine lz4==4.3.2 @@ -106,39 +104,23 @@ packaging==23.2 # pytest pbr==6.0.0 # via stevedore -pep517==0.13.1 - # via requirementslib -pip-api==0.0.30 - # via isort -pipreqs==0.4.13 - # via isort pkginfo==1.9.6 # via twine platformdirs==4.1.0 - # via - # pylint - # requirementslib -plette[validation]==0.4.4 - # via requirementslib + # via pylint pluggy==1.3.0 # via # -r requirements/test.txt # pytest pycodestyle==2.11.1 # via -r requirements/quality.in -pycparser==2.21 - # via cffi -pydantic==2.5.2 - # via requirementslib -pydantic-core==2.14.5 - # via pydantic pydocstyle==6.3.0 # via -r requirements/quality.in pygments==2.17.2 # via # readme-renderer # rich -pylint==3.0.2 +pylint==3.0.3 # via # edx-lint # pylint-celery @@ -158,6 +140,10 @@ pytest==7.4.3 # pytest-cov pytest-cov==4.1.0 # via -r requirements/test.txt +python-dateutil==2.8.2 + # via + # -r requirements/test.txt + # botocore python-slugify==8.0.1 # via code-annotations pytz==2023.3.post1 @@ -165,28 +151,35 @@ pytz==2023.3.post1 # -r requirements/test.txt # clickhouse-connect pyyaml==6.0.1 - # via code-annotations + # via + # -r requirements/test.txt + # code-annotations readme-renderer==42.0 # via twine requests==2.31.0 # via # -r requirements/test.txt # requests-toolbelt - # requirementslib # twine - # yarg requests-toolbelt==1.0.0 # via twine -requirementslib==3.0.0 - # via isort rfc3986==2.0.0 # via twine rich==13.7.0 # via twine -secretstorage==3.3.3 - # via keyring +s3transfer==0.8.2 + # via + # -r requirements/test.txt + # boto3 six==1.16.0 - # via edx-lint + # via + # -r requirements/test.txt + # edx-lint + # python-dateutil +smart-open[s3]==6.4.0 + # via + # -r requirements/test.txt + # smart-open snowballstemmer==2.2.0 # via pydocstyle stevedore==5.1.0 @@ -197,32 +190,24 @@ tomli==2.0.1 # via # -r requirements/test.txt # coverage - # pep517 # pylint # pytest tomlkit==0.12.3 - # via - # plette - # pylint - # requirementslib + # via pylint twine==4.0.2 # via -r requirements/quality.in typing-extensions==4.9.0 # via - # annotated-types # astroid - # pydantic - # pydantic-core # pylint # rich -urllib3==2.1.0 +urllib3==1.26.18 # via # -r requirements/test.txt + # botocore # clickhouse-connect # requests # twine -yarg==0.1.9 - # via pipreqs zipp==3.17.0 # via # importlib-metadata @@ -231,7 +216,3 @@ zstandard==0.22.0 # via # -r requirements/test.txt # clickhouse-connect - -# The following packages are considered to be unsafe in a requirements file: -# pip -# setuptools diff --git a/requirements/test.txt b/requirements/test.txt index 7e87cac..28f680e 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -4,6 +4,15 @@ # # make upgrade # +boto3==1.33.12 + # via + # -r requirements/base.txt + # smart-open +botocore==1.33.12 + # via + # -r requirements/base.txt + # boto3 + # s3transfer certifi==2023.11.17 # via # -r requirements/base.txt @@ -29,6 +38,11 @@ idna==3.6 # requests iniconfig==2.0.0 # via pytest +jmespath==1.0.1 + # via + # -r requirements/base.txt + # boto3 + # botocore lz4==4.3.2 # via # -r requirements/base.txt @@ -41,19 +55,38 @@ pytest==7.4.3 # via pytest-cov pytest-cov==4.1.0 # via -r requirements/test.in +python-dateutil==2.8.2 + # via + # -r requirements/base.txt + # botocore pytz==2023.3.post1 # via # -r requirements/base.txt # clickhouse-connect +pyyaml==6.0.1 + # via -r requirements/base.txt requests==2.31.0 # via -r requirements/base.txt +s3transfer==0.8.2 + # via + # -r requirements/base.txt + # boto3 +six==1.16.0 + # via + # -r requirements/base.txt + # python-dateutil +smart-open[s3]==6.4.0 + # via + # -r requirements/base.txt + # smart-open tomli==2.0.1 # via # coverage # pytest -urllib3==2.1.0 +urllib3==1.26.18 # via # -r requirements/base.txt + # botocore # clickhouse-connect # requests zstandard==0.22.0 diff --git a/setup.py b/setup.py index ed91a1e..45da794 100644 --- a/setup.py +++ b/setup.py @@ -15,12 +15,13 @@ include_package_data=True, entry_points=""" [console_scripts] - xapi-db-load=xapi_db_load.main:load_db + xapi-db-load=xapi_db_load.main:cli """, install_requires=[ "click", "clickhouse-connect >= 0.5, < 0.7", "requests", + "smart_open[s3]", ], url="https://github.com/openedx/xapi-db-load", project_urls={ diff --git a/tests/fixtures/small_clickhouse_config.yaml b/tests/fixtures/small_clickhouse_config.yaml new file mode 100644 index 0000000..6518863 --- /dev/null +++ b/tests/fixtures/small_clickhouse_config.yaml @@ -0,0 +1,40 @@ +# Test configuration for ClickHouse +# ################################# +backend: clickhouse +db_host: localhost +db_port: null +db_name: xapi +db_username: ch_admin +db_password: foo + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 diff --git a/tests/fixtures/small_config.yaml b/tests/fixtures/small_config.yaml new file mode 100644 index 0000000..c5c1c43 --- /dev/null +++ b/tests/fixtures/small_config.yaml @@ -0,0 +1,38 @@ +# Test configuration +# ######################### +backend: csv_file +# This will be overwritten with a tempfile location +csv_output_destination: logs/tests/ +csv_load_from_s3_after: false + +# This will be overwritten to a temp path in tests +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 10 + +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 diff --git a/tests/fixtures/small_ralph_config.yaml b/tests/fixtures/small_ralph_config.yaml new file mode 100644 index 0000000..aee8aa0 --- /dev/null +++ b/tests/fixtures/small_ralph_config.yaml @@ -0,0 +1,43 @@ +# Test configuration for Ralph / ClickHouse +# ######################################### +backend: ralph_clickhouse +db_host: localhost +db_port: null +db_name: xapi +db_username: ch_admin +db_password: foo +lrs_url: https://ralph/xAPI/statements +lrs_username: ralph +lrs_password: foo + +# Run options +log_dir: logs +num_batches: 3 +batch_size: 100 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 diff --git a/tests/test_xapi-db-load.py b/tests/test_xapi-db-load.py index b629a54..80bf0d6 100644 --- a/tests/test_xapi-db-load.py +++ b/tests/test_xapi-db-load.py @@ -3,52 +3,109 @@ """ import gzip import os +from contextlib import contextmanager from unittest.mock import patch +import yaml from click.testing import CliRunner from xapi_db_load.main import load_db +@contextmanager +def override_config(config_path, tmpdir): + """ + Override the config file with runtime variables (temp file paths, etc). + + Overrides for both the test code and the loading code. + """ + with open(config_path, "r") as f: + test_config = yaml.safe_load(f) + + test_config["log_dir"] = str(tmpdir) + test_config["csv_output_destination"] = str(tmpdir) + + with patch('xapi_db_load.main.get_config') as mock_config: + mock_config.return_value = test_config + try: + yield test_config + finally: + pass + + def test_csv(tmpdir): - test_path = tmpdir.mkdir("test_csv") - runner = CliRunner() - result = runner.invoke( - load_db, - "--backend=csv_file " - "--num_batches=3 " - "--batch_size=5 " - f'--csv_output_directory="{test_path}"', - ) - assert "Done! Added 15 rows!" in result.output - assert "Total run time" in result.output + test_path = "tests/fixtures/small_config.yaml" + + with override_config(test_path, tmpdir) as test_config: + runner = CliRunner() + result = runner.invoke( + load_db, + f"--config_file {test_path}", + catch_exceptions=False + ) + + assert "Currently written row count" in result.output + assert "Done" in result.output + assert "Total run time" in result.output + + makeup = test_config["course_size_makeup"]["small"] - with gzip.open(os.path.join(test_path, "xapi.csv.gz"), "r") as csv: - assert len(csv.readlines()) == 15 - with gzip.open(os.path.join(test_path, "courses.csv.gz"), "r") as csv: - assert len(csv.readlines()) >= 1 - with gzip.open(os.path.join(test_path, "blocks.csv.gz"), "r") as csv: - assert len(csv.readlines()) > 1 + expected_enrollments = test_config["num_course_sizes"]["small"] * makeup["actors"] + expected_statements = test_config["num_batches"] * test_config["batch_size"] + expected_enrollments + expected_actors = test_config["num_actors"] + expected_courses = test_config["num_course_sizes"]["small"] + # We want all the configured block types, which are currently everything in + # the config except the actor and forum post count + expected_course_blocks = sum(makeup.values()) - makeup["actors"] - makeup["forum_posts"] -@patch("xapi_db_load.main.ralph") -def test_ralph_clickhouse(mock_ralph): + # Plus 1 for the course block + expected_blocks = (expected_course_blocks + 1) * expected_courses + + for prefix, expected in ( + ("xapi", expected_statements), + ("courses", expected_courses), + ("blocks", expected_blocks), + ("external_ids", expected_actors), + ("user_profiles", expected_actors) + ): + with gzip.open(os.path.join(test_config["log_dir"], f"{prefix}.csv.gz"), "r") as csv: + assert len(csv.readlines()) == expected, f"Bad row count in csv file {prefix}.csv.gz." + + +@patch("xapi_db_load.backends.clickhouse_lake.clickhouse_connect") +def test_clickhouse_lake(_, tmpdir): + test_path = "tests/fixtures/small_clickhouse_config.yaml" + + with override_config(test_path, tmpdir): + runner = CliRunner() + result = runner.invoke( + load_db, + f"--config_file {test_path}", + catch_exceptions=False, + ) + + assert "Done." in result.output + assert "55 enrollment events inserted." in result.output + assert "Done! Added 300 rows!" in result.output + assert "Total run time" in result.output + + +@patch("xapi_db_load.backends.ralph_lrs.requests") +@patch("xapi_db_load.backends.clickhouse_lake.clickhouse_connect") +def test_ralph_clickhouse(mock_requests, _, tmpdir): + test_path = "tests/fixtures/small_ralph_config.yaml" runner = CliRunner() - result = runner.invoke( - load_db, - "--backend=ralph_clickhouse " - "--num_batches=3 " - "--batch_size=5 " - "--db_host=fake " - "--db_name=fake " - "--db_username=fake " - "--db_password=fake " - "--lrs_url=fake " - "--lrs_username=fake " - "--lrs_password=fake", - catch_exceptions=False, - ) - print(mock_ralph.mock_calls) + + with override_config(test_path, tmpdir): + result = runner.invoke( + load_db, + f"--config_file {test_path}", + catch_exceptions=False, + ) + print(mock_requests.mock_calls) print(result.output) - assert "Done! Added 15 rows!" in result.output + assert "Done." in result.output + assert "60 enrollment events inserted." in result.output + assert "Done! Added 300 rows!" in result.output assert "Total run time" in result.output diff --git a/xapi_db_load/__init__.py b/xapi_db_load/__init__.py index 2530707..90b824a 100644 --- a/xapi_db_load/__init__.py +++ b/xapi_db_load/__init__.py @@ -2,4 +2,4 @@ Scripts to generate fake xAPI data against various backends. """ -__version__ = "0.9" +__version__ = "1.0" diff --git a/xapi_db_load/backends/clickhouse_lake.py b/xapi_db_load/backends/clickhouse_lake.py index eddc4b4..1145ebf 100644 --- a/xapi_db_load/backends/clickhouse_lake.py +++ b/xapi_db_load/backends/clickhouse_lake.py @@ -1,8 +1,7 @@ """ ClickHouse data lake implementation. """ - -import random +import os import uuid from datetime import datetime @@ -16,24 +15,18 @@ class XAPILakeClickhouse: client = None - def __init__( - self, - db_host="localhost", - db_port=18123, - db_username="default", - db_password=None, - db_name=None, - ): - self.host = db_host - self.port = db_port - self.username = db_username - self.database = db_name - self.db_password = db_password - - self.event_raw_table_name = "xapi_events_all" - self.event_table_name = "xapi_events_all_parsed" - self.event_table_name_mv = "xapi_events_all_parsed_mv" - self.get_org_function_name = "get_org_from_course_url" + def __init__(self, config): + self.host = config.get("db_host", "localhost") + self.port = config.get("db_port", "18123") + self.username = config.get("db_username", "default") + self.database = config.get("db_name", "xapi") + self.event_sink_database = config.get("db_event_sink_name", "event_sink") + self.db_password = config.get("db_password") + self.s3_key = config.get("s3_key") + self.s3_secret = config.get("s3_secret") + + self.event_raw_table_name = config.get("event_raw_table_name", "xapi_events_all") + self.event_table_name = config.get("event_table_name", "xapi_events_all_parsed") self.set_client() def set_client(self): @@ -76,141 +69,6 @@ def print_row_counts(self): res = self.client.query(f"SELECT count(*) FROM {self.event_table_name}") print(res.result_set) - def create_db(self): - """ - Create the destination database if it doesn't exist. - """ - self.client.command(f"CREATE DATABASE IF NOT EXISTS {self.database}") - - def drop_tables(self): - """ - Drop existing table structures. - """ - self.client.command(f"DROP TABLE IF EXISTS {self.event_raw_table_name}") - self.client.command(f"DROP TABLE IF EXISTS {self.event_table_name}") - self.client.command(f"DROP FUNCTION IF EXISTS {self.get_org_function_name}") - self.client.command(f"DROP TABLE IF EXISTS {self.event_table_name_mv}") - print("Tables dropped") - - def create_tables(self): - """ - Create the base xAPI tables and top level materialized views. - - In the future we should manage this through the scripts in tutor-contrib-aspects to keep the - table structures compatible. - """ - sql = f""" - CREATE TABLE IF NOT EXISTS {self.event_raw_table_name} ( - event_id UUID NOT NULL, - emission_time DateTime64(6) NOT NULL, - event JSON NOT NULL, - event_str String NOT NULL, - ) - ENGINE MergeTree ORDER BY ( - emission_time, - event_id) - PRIMARY KEY (emission_time, event_id) - """ - - print(sql) - self.client.command(sql) - - sql = f""" - CREATE TABLE IF NOT EXISTS {self.event_table_name} ( - event_id UUID NOT NULL, - verb_id String NOT NULL, - actor_id String NOT NULL, - object_id String NOT NULL, - org String NOT NULL, - course_id String NOT NULL, - emission_time DateTime64(6) NOT NULL, - event_str String NOT NULL - ) ENGINE MergeTree - ORDER BY (org, course_id, verb_id, actor_id, emission_time, event_id) - PRIMARY KEY (org, course_id, verb_id, actor_id, emission_time, event_id); - """ - - print(sql) - self.client.command(sql) - - sql = f""" - CREATE FUNCTION IF NOT EXISTS {self.get_org_function_name} AS (course_url) -> - nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9]*)'), '') - ;""" - - print(sql) - self.client.command(sql) - - sql = f""" - CREATE MATERIALIZED VIEW IF NOT EXISTS {self.event_table_name_mv} - TO {self.event_table_name} AS - SELECT - event_id as event_id, - JSON_VALUE(event_str, '$.verb.id') as verb_id, - JSON_VALUE(event_str, '$.actor.account.name') as actor_id, - JSON_VALUE(event_str, '$.object.id') as object_id, - -- If the contextActivities parent is a course, use that. Otherwise use the object id for the course id - if( - JSON_VALUE( - event_str, - '$.context.contextActivities.parent[0].definition.type') - = 'http://adlnet.gov/expapi/activities/course', - JSON_VALUE(event_str, '$.context.contextActivities.parent[0].id'), - JSON_VALUE(event_str, '$.object.id') - ) as course_id, - {self.get_org_function_name}(course_id) as org, - emission_time as emission_time, - event_str as event_str - FROM {self.event_raw_table_name}; - """ - print(sql) - self.client.command(sql) - - sql = """ - CREATE TABLE IF NOT EXISTS event_sink.course_overviews - ( - org String, - course_key String, - display_name String, - course_start String, - course_end String, - enrollment_start String, - enrollment_end String, - self_paced Bool, - course_data_json String, - created String, - modified String, - dump_id UUID, - time_last_dumped String - ) - engine = MergeTree PRIMARY KEY (org, course_key, modified, time_last_dumped) - ORDER BY (org, course_key, modified, time_last_dumped); - """ - - print(sql) - self.client.command(sql) - - sql = """ - CREATE TABLE IF NOT EXISTS event_sink.course_blocks - ( - org String, - course_key String, - location String, - display_name String, - xblock_data_json String, - order Int32 default 0, - edited_on String, - dump_id UUID, - time_last_dumped String - ) - engine = MergeTree PRIMARY KEY (org, course_key, location, edited_on) - ORDER BY (org, course_key, location, edited_on, order); - """ - print(sql) - self.client.command(sql) - - print("Tables created") - def batch_insert(self, events): """ Insert a batch of events to ClickHouse. @@ -233,14 +91,8 @@ def batch_insert(self, events): ) VALUES {vals} """ - # Sometimes the connection randomly dies, this gives us a second shot in that case - try: - self.client.command(sql) - except clickhouse_connect.driver.exceptions.OperationalError: - print("ClickHouse OperationalError, trying to reconnect.") - self.set_client() - print("Retrying insert...") - self.client.command(sql) + + self._insert_sql_with_retry(sql) def insert_event_sink_course_data(self, courses): """ @@ -275,31 +127,11 @@ def insert_event_sink_course_data(self, courses): raise vals = ",".join(out_data) sql = f""" - INSERT INTO event_sink.course_overviews ( - org, - course_key, - display_name, - course_start, - course_end, - enrollment_start, - enrollment_end, - self_paced, - course_data_json, - created, - modified, - dump_id, - time_last_dumped - ) + INSERT INTO {self.event_sink_database}.course_overviews VALUES {vals} """ - # Sometimes the connection randomly dies, this gives us a second shot in that case - try: - self.client.command(sql) - except clickhouse_connect.driver.exceptions.OperationalError: - print("ClickHouse OperationalError, trying to reconnect.") - self.set_client() - print("Retrying insert...") - self.client.command(sql) + + self._insert_sql_with_retry(sql) def insert_event_sink_block_data(self, courses): """ @@ -332,27 +164,124 @@ def insert_event_sink_block_data(self, courses): vals = ",".join(out_data) sql = f""" - INSERT INTO event_sink.course_blocks ( - org, - course_key, - location, - display_name, - xblock_data_json, - order, - edited_on, - dump_id, - time_last_dumped - ) + INSERT INTO {self.event_sink_database}.course_blocks VALUES {vals} """ - # Sometimes the connection randomly dies, this gives us a second shot in that case - try: - self.client.command(sql) - except clickhouse_connect.driver.exceptions.OperationalError: - print("ClickHouse OperationalError, trying to reconnect.") - self.set_client() - print("Retrying insert...") - self.client.command(sql) + + self._insert_sql_with_retry(sql) + + def insert_event_sink_actor_data(self, actors): + """ + Insert the user_profile and external_id data to ClickHouse. + + This allows us to test PII reports. + """ + out_external_id = [] + out_profile = [] + for actor in actors: + dump_id = str(uuid.uuid4()) + dump_time = datetime.utcnow() + + id_row = f"""( + '{actor.id}', + 'xapi', + '{actor.username}', + '{actor.user_id}', + '{dump_id}', + '{dump_time}' + )""" + out_external_id.append(id_row) + + # This first column is usually the MySQL row pk, we just + # user this for now to have a unique id. + profile_row = f"""( + '{actor.user_id}', + '{actor.user_id}', + '{actor.name}', + '{actor.meta}', + '{actor.courseware}', + '{actor.language}', + '{actor.location}', + '{actor.year_of_birth}', + '{actor.gender}', + '{actor.level_of_education}', + '{actor.mailing_address}', + '{actor.city}', + '{actor.country}', + '{actor.state}', + '{actor.goals}', + '{actor.bio}', + '{actor.profile_image_uploaded_at}', + '{actor.phone_number}', + '{dump_id}', + '{dump_time}' + )""" + + out_profile.append(profile_row) + + # Now do the actual inserts... + vals = ",".join(out_external_id) + sql = f""" + INSERT INTO {self.event_sink_database}.external_id + VALUES {vals} + """ + self._insert_sql_with_retry(sql) + + vals = ",".join(out_profile) + sql = f""" + INSERT INTO {self.event_sink_database}.user_profile + VALUES {vals} + """ + self._insert_sql_with_retry(sql) + + def _insert_sql_with_retry(self, sql): + """ + Wrap insert commands with a single retry. + """ + # Sometimes the connection randomly dies, this gives us a second shot in that case + try: + self.client.command(sql) + except clickhouse_connect.driver.exceptions.OperationalError: + print("ClickHouse OperationalError, trying to reconnect.") + self.set_client() + print("Retrying insert...") + self.client.command(sql) + except clickhouse_connect.driver.exceptions.DatabaseError: + print("ClickHouse DatabaseError:") + print(sql) + raise + + def load_from_s3(self, s3_location): + """ + Load generated csv.gz files from S3. + + This does a bulk file insert directly from S3 to ClickHouse, so files + never get downloaded directly to the local process. + """ + loads = ( + (f"{self.event_sink_database}.course_overviews", os.path.join(s3_location, "courses.csv.gz")), + (f"{self.event_sink_database}.course_blocks", os.path.join(s3_location, "blocks.csv.gz")), + (f"{self.event_sink_database}.external_id", os.path.join(s3_location, "external_ids.csv.gz")), + (f"{self.event_sink_database}.user_profile", os.path.join(s3_location, "user_profiles.csv.gz")), + (f"{self.database}.{self.event_raw_table_name}", os.path.join(s3_location, "xapi.csv.gz")) + ) + + for table_name, file_path in loads: + print(f"Inserting into {table_name}") + + sql = f""" + INSERT INTO {table_name} + SELECT * + FROM s3('{file_path}', '{self.s3_key}', '{self.s3_secret}', 'CSV'); + """ + + self.client.command(sql) + self.print_db_time() + + def finalize(self): + """ + Nothing to finalize here. + """ def _run_query_and_print(self, query_name, query): """ @@ -372,10 +301,10 @@ def do_queries(self, event_generator): Query data from the table and document how long the query runs (while the insert script is running). """ # Get our randomly selected targets for this run - course = random.choice(event_generator.known_courses) + course = event_generator.get_course() course_url = course.course_url - org = random.choice(event_generator.known_orgs) - actor = random.choice(event_generator.known_actor_uuids) + org = event_generator.get_org() + actor = course.get_enrolled_actor().actor.id self._run_query_and_print( "Count of enrollment events for course {course_url}", @@ -398,7 +327,7 @@ def do_queries(self, event_generator): ) self._run_query_and_print( - "Count of enrollments for this learner", + "Count of enrollments for this actor", f""" select count(*) from {self.event_table_name} @@ -459,98 +388,3 @@ def do_queries(self, event_generator): and emission_time between date_sub(MINUTE, 5, now('UTC')) and now('UTC')) as b """, ) - - def do_distributions(self): - """ - Execute and print the timing of distribution queries to enable comparisons across runs. - """ - self._run_query_and_print( - "Count of courses", - f""" - select count(distinct course_id) - from {self.event_table_name} - """, - ) - - self._run_query_and_print( - "Count of learners", - f""" - select count(distinct actor_id) - from {self.event_table_name} - """, - ) - - self._run_query_and_print( - "Count of verbs", - f""" - select count(*), verb_id - from {self.event_table_name} - group by verb_id - """, - ) - - self._run_query_and_print( - "Count of orgs", - f""" - select count(*), org - from {self.event_table_name} - group by org - """, - ) - - self._run_query_and_print( - "Avg, min, max students per course", - f""" - select avg(a.num_students) as avg_students, - min(a.num_students) as min_students, - max(a.num_students) max_students - from ( - select count(distinct actor_id) as num_students - from {self.event_table_name} - group by course_id - ) a - """, - ) - - self._run_query_and_print( - "Avg, min, max problems per course", - f""" - select avg(a.num_problems) as avg_problems, min(a.num_problems) as min_problems, - max(a.num_problems) max_problems - from ( - select count(distinct object_id) as num_problems - from {self.event_table_name} - where JSON_VALUE(event_str, '$.object.definition.type') = - 'http://adlnet.gov/expapi/activities/cmi.interaction' - group by course_id - ) a - """, - ) - - self._run_query_and_print( - "Avg, min, max videos per course", - f""" - select avg(a.num_videos) as avg_videos, min(a.num_videos) as min_videos, - max(a.num_videos) max_videos - from ( - select count(distinct object_id) as num_videos - from {self.event_table_name} - where JSON_VALUE(event_str, '$.object.definition.type') = - 'https://w3id.org/xapi/video/activity-type/video' - group by object_id - ) a - """, - ) - - self._run_query_and_print( - "Random event by id", - f""" - select * - from {self.event_table_name} - where event_id = ( - select event_id - from {self.event_table_name} - limit 1 - ) - """, - ) diff --git a/xapi_db_load/backends/csv.py b/xapi_db_load/backends/csv.py index a4af013..5b228fb 100644 --- a/xapi_db_load/backends/csv.py +++ b/xapi_db_load/backends/csv.py @@ -4,29 +4,35 @@ This can be used to generate a gzipped csv of events that can be loaded into any system. """ import csv -import gzip import os import uuid from datetime import datetime +from smart_open import open as smart + class XAPILakeCSV: """ CSV fake data lake implementation. """ - def __init__(self, output_directory): - # This isn't really a database, so just faking out all of this. - self.xapi_csv_writer = self._get_csv_handle("xapi", output_directory) - self.course_csv_writer = self._get_csv_handle("courses", output_directory) - self.blocks_csv_writer = self._get_csv_handle("blocks", output_directory) + def __init__(self, config): + output_destination = config['csv_output_destination'] + + self.xapi_csv_handle, self.xapi_csv_writer = self._get_csv_handle("xapi", output_destination) + self.course_csv_handle, self.course_csv_writer = self._get_csv_handle("courses", output_destination) + self.blocks_csv_handle, self.blocks_csv_writer = self._get_csv_handle("blocks", output_destination) + self.profile_csv_handle, self.profile_csv_writer = self._get_csv_handle("user_profiles", output_destination) + self.external_id_csv_handle, self.external_id_csv_writer = self._get_csv_handle( + "external_ids", output_destination + ) self.row_count = 0 - def _get_csv_handle(self, file_type, output_directory): - out_filepath = os.path.join(output_directory, f"{file_type}.csv.gz") - out_filehandle = gzip.open(out_filepath, "wt") - return csv.writer(out_filehandle) + def _get_csv_handle(self, file_type, output_destination): + out_filepath = os.path.join(output_destination, f"{file_type}.csv.gz") + file_handle = smart(out_filepath, "w", compression=".gz") + return file_handle, csv.writer(file_handle) def print_db_time(self): """ @@ -61,7 +67,7 @@ def batch_insert(self, events): Write a batch of rows to the CSV. """ for v in events: - out = (v["event_id"], v["emission_time"], str(v["event"])) + out = (v["event_id"], v["emission_time"], '', str(v["event"])) self.xapi_csv_writer.writerow(out) self.row_count += len(events) @@ -110,14 +116,57 @@ def insert_event_sink_block_data(self, courses): dump_time )) - def do_queries(self, event_generator): + def insert_event_sink_actor_data(self, actors): """ - Execute queries, not needed here. + Write out the user profile data and external id files. """ + for actor in actors: + dump_id = str(uuid.uuid4()) + dump_time = datetime.utcnow() + + self.external_id_csv_writer.writerow(( + actor.id, + "xapi", + actor.username, + actor.user_id, + dump_id, + dump_time, + )) - def do_distributions(self): + self.profile_csv_writer.writerow(( + # This first column is usually the MySQL row pk, we just + # user this for now to have a unique id. + actor.user_id, + actor.user_id, + actor.name, + actor.meta, + actor.courseware, + actor.language, + actor.location, + actor.year_of_birth, + actor.gender, + actor.level_of_education, + actor.mailing_address, + actor.city, + actor.country, + actor.state, + actor.goals, + actor.bio, + actor.profile_image_uploaded_at, + actor.phone_number, + dump_id, + dump_time + )) + + def finalize(self): """ - Execute distribution queries, not needed here. + Close file handles so that they can be readable on import. + """ + self.xapi_csv_handle.close() + self.course_csv_handle.close() + self.blocks_csv_handle.close() - But this is the last step, so take the opportunity to close the file. + def do_queries(self, event_generator): + """ + Execute queries, not needed here. """ diff --git a/xapi_db_load/backends/ralph_lrs.py b/xapi_db_load/backends/ralph_lrs.py index 9841648..8b8967d 100644 --- a/xapi_db_load/backends/ralph_lrs.py +++ b/xapi_db_load/backends/ralph_lrs.py @@ -30,21 +30,11 @@ class XAPILRSRalphClickhouse(XAPILakeClickhouse): Wraps the XAPILakeClickhouse backend so that queries can be run against it while using Ralph to do the insertion. """ - def __init__( - self, - db_host, - lrs_url, - lrs_username, - lrs_password, - db_port=18123, - db_username="default", - db_password=None, - db_name="xapi", - ): - super().__init__(db_host, db_port, db_username, db_password, db_name) - self.lrs_url = lrs_url - self.lrs_username = lrs_username - self.lrs_password = lrs_password + def __init__(self, config): + super().__init__(config) + self.lrs_url = config["lrs_url"] + self.lrs_username = config["lrs_username"] + self.lrs_password = config["lrs_password"] def batch_insert(self, events): """ @@ -59,4 +49,8 @@ def batch_insert(self, events): json=out_data, headers={"Content-Type": "application/json"}, ) - resp.raise_for_status() + try: + resp.raise_for_status() + except requests.HTTPError: + print(json.dumps(out_data)) + raise diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index 47728d6..5a63a6e 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -1,31 +1,360 @@ """ Configuration values for emulating courses of various sizes. """ +import datetime +import json +import random +import uuid +from collections import namedtuple +from random import choice, randrange +EnrolledActor = namedtuple("Actor", ["actor", "enroll_datetime"]) -class CourseConfigSmall: - items = (10, 20) - problems = (10, 20) - videos = (5, 10) - sequences = (5, 10) +class Actor: + """ + Wrapper for actor PII data. -class CourseConfigMedium: - items = (20, 40) - problems = (20, 40) - videos = (10, 20) - sequences = (10, 20) + These are a combination of fields from edx-platform UserProfile and + ExternalId models. These fields are largely unpopulated in real life, + especially after the introduction of the profile MFE, but operators have + the capability to fill them in various ways. + """ + def __init__(self, user_id): + # Integer user id, just the counter from actor population + self.user_id = user_id -class CourseConfigLarge: - items = (40, 80) - problems = (40, 80) - videos = (10, 30) - sequences = (20, 40) + # "external_id" UUID + self.id = str(uuid.uuid4()) + # LMS username + self.username = f"actor_{self.user_id}" -class CourseConfigWhopper: - items = (80, 200) - problems = (80, 160) - videos = (10, 40) - sequences = (40, 80) + # These may or may not ever be populated in real life, potentially + # useful values are populated here. + self.name = f"Actor {user_id}" + self.year_of_birth = random.randint(1900, 2010) + self.gender = random.choice(["", "m", "f", "o"]) + self.level_of_education = random.choice(["", "p", "m", "b", "none", "other"]) + self.country = random.choice(["", "US", "CO", "AU", "IN", "PK"]) + self.goals = "" + self.bio = "" + + # These will probably never be populated, and aren't expected to be used + # but are part of the event sink and table + self.meta = "{}" + self.courseware = "" + self.language = "" + self.location = "" + self.mailing_address = "" + self.city = "" + self.state = "" + self.profile_image_uploaded_at = "" + self.phone_number = "" + + +class RandomCourse: + """ + Holds "known objects" and configuration values for a fake course. + """ + + items_in_course = 0 + chapter_ids = [] + sequential_ids = [] + vertical_ids = [] + problem_ids = [] + video_ids = [] + forum_post_ids = [] + actors = [] + start_date = None + end_date = None + + def __init__( + self, + org, + overall_start_date, + overall_end_date, + course_length, + actors, + course_config_name, + course_size_makeup + ): + self.course_uuid = str(uuid.uuid4())[:6] + self.course_name = f"{self.course_uuid} ({course_config_name})" + self.org = org + self.course_id = f"course-v1:{org}+DemoX+{self.course_uuid}" + self.course_url = f"http://localhost:18000/course/{self.course_id}" + + delta = datetime.timedelta(days=course_length) + self.start_date = self._random_datetime(overall_start_date, overall_end_date - delta) + self.end_date = self.start_date + delta + + self.actors = [ + EnrolledActor(a, self._random_datetime(self.start_date, self.end_date)) + for a in actors + ] + + self.course_config_name = course_config_name + self.course_config = course_size_makeup + self.configure() + + def __repr__(self): + return f"""{self.course_name}: + {self.start_date} - {self.end_date} + {self.course_config} + """ + + def configure(self): + """ + Set up the fake course configuration such as course length, start and end dates, and size. + """ + self.chapter_ids = [ + self._generate_random_block_type_id("chapter") + for _ in range(self.course_config["chapters"]) + ] + + self.sequential_ids = [ + self._generate_random_block_type_id("sequential") + for _ in range(self.course_config["sequences"]) + ] + + self.vertical_ids = [ + self._generate_random_block_type_id("vertical") + for _ in range(self.course_config["verticals"]) + ] + + self.problem_ids = [ + self._generate_random_block_type_id("problem") + for _ in range(self.course_config["problems"]) + ] + + self.video_ids = [ + self._generate_random_block_type_id("video") + for _ in range(self.course_config["videos"]) + ] + + self.forum_post_ids = [ + self._generate_random_forum_post_id() + for _ in range(self.course_config["forum_posts"]) + ] + + for config in ("videos", "problems", "verticals", "sequences", "chapters", "forum_posts"): + self.items_in_course += self.course_config[config] + + def get_random_emission_time(self, actor=None): + """ + Randomizes an emission time for events that falls within the course start and end dates. + """ + if actor: + start = actor.enroll_datetime + else: + start = self.start_date + + # Make sure we're passing in a datetime, not a date + start = datetime.datetime.combine(start, datetime.time()) + + # time() is midnight, so make sure we get that last day in there + end = datetime.datetime.combine(self.end_date, datetime.time()) + datetime.timedelta(days=1) + + return self._random_datetime( + start_datetime=start, end_datetime=end + ) + + @staticmethod + def _random_datetime(start_datetime=None, end_datetime=None): + """ + Create a random datetime within the given boundaries. + + If no start date is given, we start 5 years ago. + If no end date is given, we end now. + """ + if not end_datetime: + end_datetime = datetime.datetime.utcnow() + if not start_datetime: + start_datetime = end_datetime - datetime.timedelta(days=365 * 5) + + delta = end_datetime - start_datetime + int_delta = (delta.days * 24 * 60 * 60) + delta.seconds + random_second = randrange(int_delta) + return start_datetime + datetime.timedelta(seconds=random_second) + + def get_enrolled_actor(self): + """ + Return an actor from those known in this course. + """ + return choice(self.actors) + + def get_video_id(self): + """ + Return a video id from our list of known video ids. + """ + return choice(self.video_ids) + + def _generate_random_block_type_id(self, block_type): + block_uuid = str(uuid.uuid4())[:8] + return f"http://localhost:18000/xblock/block-v1:{self.course_id}+type@{block_type}+block@{block_uuid}" + + def get_problem_id(self): + """ + Return a problem id from our list of known problem ids. + """ + return choice(self.problem_ids) + + def get_random_sequential_id(self): + """ + Return a sequential id from our list of known sequential ids. + """ + return choice(self.sequential_ids) + + def get_random_forum_post_id(self): + """ + Return a sequential id from our list of known sequential ids. + """ + return choice(self.forum_post_ids) + + def _generate_random_forum_post_id(self): + thread_id = str(uuid.uuid4())[:8] + return f"http://localhost:18000/api/discussion/v1/threads/{thread_id}" + + def get_random_nav_location(self): + """ + Return a navigation location from our list of known ids. + """ + return str(randrange(1, self.items_in_course)) + + def serialize_course_data_for_event_sink(self): + """ + Return a dict representing the course data from event-sink-clickhouse. + """ + return { + "org": self.org, + "course_key": self.course_id, + "display_name": self.course_name, + "course_start": self.start_date, + "course_end": self.end_date, + "enrollment_start": self.start_date, + "enrollment_end": self.end_date, + "self_paced": choice([True, False]), + # This is a catchall field, we don't currently use it + "course_data_json": "{}", + "created": self.start_date, + "modified": self.end_date + } + + def _serialize_block(self, block_type, block_id, cnt): + return { + "org": self.org, + "course_key": self.course_id, + "location": block_id.split("/xblock/")[-1], + "display_name": f"{block_type.title()} {cnt}", + # This gets appended with location data below + "xblock_data_json": {"block_type": block_type}, + "order": cnt, + "edited_on": self.end_date + } + + def _serialize_course_block(self): + location_course_id = self.course_id.replace("course-v1:", "") + return { + "org": self.org, + "course_key": self.course_id, + "location": f"block-v1:{location_course_id}+type@course+block@course", + "display_name": f"Course {self.course_uuid[:5]}", + # This gets appended with location data below + "xblock_data_json": {"block_type": "course"}, + "order": 1, + "edited_on": self.end_date + } + + def serialize_block_data_for_event_sink(self): + """ + Return a list of dicts representing all blocks in this course. + + The data format mirrors what is created by event-sink-clickhouse. + + Block types we care about: + -- x course block + -- x video block + -- x vertical block + -- static_tab block + -- x sequential + -- x problem block + -- html block + -- discussion block + -- course_info + -- x chapter block + -- about block + """ + blocks = [] + cnt = 1 + + # Get all of our blocks in order + for v in self.video_ids: + blocks.append(self._serialize_block("video", v, cnt)) + cnt += 1 + for p in self.problem_ids: + blocks.append(self._serialize_block("problem", p, cnt)) + cnt += 1 + + course_structure = [self._serialize_course_block()] + + for c in self.chapter_ids: + course_structure.append(self._serialize_block("chapter", c, cnt)) + cnt += 1 + + for s in self.sequential_ids: + # Randomly insert some sequentials under the chapters + course_structure.insert( + # Start at 2 here to make sure it's after the course and first + # chapter block + random.randint(2, len(course_structure)), + self._serialize_block("sequential", s, cnt) + ) + cnt += 1 + + for v in self.vertical_ids: + # Randomly insert some verticals under the sequentials + course_structure.insert( + # Start at 3 here to make sure it's after the course and first + # chapter block and first sequential block + random.randint(2, len(course_structure)), + self._serialize_block("vertical", v, cnt) + ) + cnt += 1 + + # Now add in the blocks wherever, as long as they're after the + # course, first chapter, first sequential, and first vertical. After + # that they'll all be mixed together, but this will do for now. + for b in blocks: + course_structure.insert( + random.randint(4, len(course_structure)), + b + ) + + # Now actually set up the locations. These are important and used to + # generate block display names in the database + section_idx = 0 + subsection_idx = 0 + unit_idx = 0 + + for block in course_structure: + if block["display_name"].startswith("Chapter"): + section_idx += 1 + subsection_idx = 0 + unit_idx = 0 + elif block["display_name"].startswith("Sequential"): + subsection_idx += 1 + unit_idx = 0 + elif block["display_name"].startswith("Vertical"): + unit_idx += 1 + + block["xblock_data_json"].update({ + "section": section_idx, + "subsection": subsection_idx, + "unit": unit_idx, + }) + + block["xblock_data_json"] = json.dumps(block["xblock_data_json"]) + + return course_structure diff --git a/xapi_db_load/generate_load.py b/xapi_db_load/generate_load.py index 8ed1aaa..d3a9653 100644 --- a/xapi_db_load/generate_load.py +++ b/xapi_db_load/generate_load.py @@ -4,9 +4,12 @@ import datetime import pprint import uuid -from random import choice, choices, randrange +from random import choice, choices -from .course_configs import CourseConfigLarge, CourseConfigMedium, CourseConfigSmall, CourseConfigWhopper +from xapi_db_load.utils import LogTimer, setup_timing + +from .course_configs import Actor, RandomCourse +from .xapi.xapi_forum import PostCreated from .xapi.xapi_grade import CourseGradeCalculated, FirstTimePassed from .xapi.xapi_hint_answer import ShowAnswer, ShowHint from .xapi.xapi_navigation import LinkClicked, NextNavigation, PreviousNavigation, TabSelectedNavigation @@ -31,7 +34,7 @@ (Unregistered, 0.146), (CompletedVideo, 5.124), (LoadedVideo, 7.125), - (PlayedVideo, 24.519), + (PlayedVideo, 24.019), (PausedVideo, 14.912), (StoppedVideo, 3.671), (PositionChangedVideo, 12.105), @@ -47,287 +50,198 @@ (TranscriptEnabled, 0.05), (TranscriptDisabled, 0.05), (CourseGradeCalculated, 1.5), + (PostCreated, 0.5), ) EVENTS = [i[0] for i in EVENT_LOAD] EVENT_WEIGHTS = [i[1] for i in EVENT_LOAD] -# These determine the proportions of each size of course created -COURSE_CONFIG_WEIGHTS = ( - (CourseConfigSmall, 10), - (CourseConfigMedium, 50), - (CourseConfigLarge, 30), - (CourseConfigWhopper, 10), -) - -COURSE_CONFIGS = [i[0] for i in COURSE_CONFIG_WEIGHTS] -COURSE_CONFIG_WEIGHTS = [i[1] for i in COURSE_CONFIG_WEIGHTS] - -BATCH_SIZE = 100 - def _get_uuid(): return str(uuid.uuid4()) -def _get_random_thing( - thing, - func_for_new_thing=_get_uuid, - one_in_range=1000, - max_thing_length=1000000 -): +class EventGenerator: """ - Return a random instantiated object of the type requested. - - A new object will be created approximately one out of every "one_in_range" - calls to this function. Otherwise, an existing object will be returned. + Generates a batch of random xAPI events based on the EVENT_WEIGHTS proportions. """ - if (not thing or randrange(one_in_range) == 5) \ - and len(thing) < max_thing_length: - new_thing = func_for_new_thing() - thing.append(new_thing) - return new_thing - return choice(thing) + actors = [] + courses = [] + orgs = [] + def __init__(self, config): + self.config = config + self.start_date = config["start_date"] + self.end_date = config["end_date"] + self._validate_config() + self.setup_orgs() + self.setup_actors() + self.setup_courses() -class RandomCourse: - """ - Holds "known objects" and configuration values for a fake course. - """ - - items_in_course = 0 - known_problem_ids = [] - known_video_ids = [] - known_sequential_ids = [] - start_date = None - end_date = None - - def __init__(self, org, start_date, end_date): - self.course_uuid = str(uuid.uuid4()) - self.org = org - self.course_id = f"course-v1:{org}+DemoX+{self.course_uuid}" - self.course_url = f"http://localhost:18000/course/{self.course_id}" - - self.start_date = start_date - self.end_date = end_date - self.course_config = choices(COURSE_CONFIGS, COURSE_CONFIG_WEIGHTS)[0] - self.configure() - - def __repr__(self): - return f"""{self.course_uuid} ({str(self.course_config)}): - {self.start_date} - {self.end_date} - Items: {self.items_in_course} - Videos: {len(self.known_video_ids)} - Problems: {len(self.known_problem_ids)} - Sequences: {len(self.known_sequential_ids)} - """ - - def configure(self): + def _validate_config(self): """ - Set up the fake course configuration such as course length, start and end dates, and size. + Make sure the given values make sense. """ - self.items_in_course = randrange( - self.course_config.items[0], self.course_config.items[1] - ) - - self.known_problem_ids = [ - self._generate_random_problem_id() - for _ in range( - randrange( - self.course_config.problems[0], self.course_config.problems[1] - ) - ) - ] - - self.known_video_ids = [ - self._generate_random_video_id() - for _ in range( - randrange(self.course_config.videos[0], self.course_config.videos[1]) - ) - ] - - self.known_sequential_ids = [ - self._generate_random_sequential_id() - for _ in range( - randrange( - self.course_config.sequences[0], self.course_config.sequences[1] - ) - ) - ] - - def get_random_emission_time(self): + if self.start_date >= self.end_date: + raise ValueError("Start date must be before end date.") + + if (self.end_date - self.start_date).days < self.config["course_length_days"]: + raise ValueError("The time between start and end dates must be longer than course_length_days.") + + for s in self.config["num_course_sizes"]: + if self.config["course_size_makeup"][s]["actors"] > self.config["num_actors"]: + raise ValueError(f"Course size {s} wants more actors than are configured in num_actors.") + + def setup_orgs(self): """ - Randomizes an emission time for events that falls within the course start and end dates. + Create some random organizations based on the config. """ - return self._random_datetime( - start_datetime=self.start_date, end_datetime=self.end_date - ) + for i in range(self.config["num_organizations"]): + self.orgs.append(f"Org{i}") - @staticmethod - def _random_datetime(start_datetime=None, end_datetime=None): + def setup_courses(self): """ - Create a random datetime within the given boundaries. - - If no start date is given, we start 5 years ago. - If no end date is given, we end now. + Pre-create a number of courses based on the config. """ - if not end_datetime: - end_datetime = datetime.datetime.utcnow() - if not start_datetime: - start_datetime = end_datetime - datetime.timedelta(days=365 * 5) - - delta = end_datetime - start_datetime - int_delta = (delta.days * 24 * 60 * 60) + delta.seconds - random_second = randrange(int_delta) - return start_datetime + datetime.timedelta(seconds=random_second) + for course_config_name, num_courses in self.config["num_course_sizes"].items(): + print(f"Setting up {num_courses} {course_config_name} courses") + for _ in range(num_courses): + course_config_makeup = self.config["course_size_makeup"][course_config_name] + org = choice(self.orgs) + actors = choices(self.actors, k=course_config_makeup["actors"]) - def _generate_random_video_id(self): - video_uuid = str(uuid.uuid4()) - return f"http://localhost:18000/xblock/block-v1:{self.course_id}+type@video+block@{video_uuid}" + self.courses.append(RandomCourse( + org, + self.start_date, + self.end_date, + self.config["course_length_days"], + actors, + course_config_name, + course_config_makeup + )) - def get_video_id(self): + def setup_actors(self): """ - Return a video id from our list of known video ids. - """ - return choice(self.known_video_ids) - - def _generate_random_problem_id(self): - problem_uuid = str(uuid.uuid4()) - return f"http://localhost:18000/xblock/block-v1:{self.course_id}+type@problem+block@{problem_uuid}" + Create all known actors. - def get_problem_id(self): + Random samplings of these will be passed into courses. """ - Return a problem id from our list of known problem ids. + self.actors = [Actor(i) for i in range(self.config["num_actors"])] + + def get_batch_events(self): """ - return choice(self.known_problem_ids) + Create a batch size list of random events. - def _generate_random_sequential_id(self): - sequential_uuid = str(uuid.uuid4()) - return f"http://localhost:18000/xblock/block-v1:{self.course_id}+type@sequential+block@{sequential_uuid}" + Events are from our EVENTS list, based on the EVENT_WEIGHTS proportions. + """ + events = choices(EVENTS, EVENT_WEIGHTS, k=self.config["batch_size"]) + return [e(self).get_data() for e in events] - def get_random_sequential_id(self): + def get_enrollment_events(self): """ - Return a sequential id from our list of known sequential ids. + Generate enrollment events for all actors. """ - return choice(self.known_sequential_ids) + enrollments = [] + for course in self.courses: + for actor in course.actors: + enrollments.append(Registered(self).get_data(course, actor)) + return enrollments - def get_random_nav_location(self): + def get_course(self): """ - Return a navigation location from our list of known ids. + Return a random course from our pre-built list. """ - return str(randrange(1, self.items_in_course)) + return choice(self.courses) - def serialize_course_data_for_event_sink(self): - """ - Return a dict representing the course data from event-sink-clickhouse. + def get_org(self): """ - return { - "org": self.org, - "course_key": self.course_id, - "display_name": f"Course {self.course_uuid[:5]}", - "course_start": self.start_date, - "course_end": self.end_date, - "enrollment_start": self.start_date, - "enrollment_end": self.end_date, - "self_paced": choice([True, False]), - # This is a catchall field, we don't currently use it - "course_data_json": "{}", - "created": self.start_date, - "modified": self.end_date - } - - def _serialize_block(self, block_type, block_id, cnt): - return { - "org": self.org, - "course_key": self.course_id, - "location": block_id.split("/xblock/")[-1], - "display_name": f"{block_type} {cnt}", - # This is a catchall field, we don't currently use it - "xblock_data_json": "{}", - "order": cnt, - "edited_on": self.end_date - } - - def _serialize_course_block(self): - location_course_id = self.course_id.replace("course-v1:", "") - return { - "org": self.org, - "course_key": self.course_id, - "location": f"block-v1:{location_course_id}+type@course+block@course", - "display_name": f"Course {self.course_uuid[:5]}", - # This is a catchall field, we don't currently use it - "xblock_data_json": "{}", - "order": 1, - "edited_on": self.end_date - } - - def serialize_block_data_for_event_sink(self): + Return a random org from our pre-built list. """ - Return a list of dicts representing all blocks in this course. + return choice(self.orgs) - The data format mirrors what is created by event-sink-clickhouse. + def dump_courses(self): + """ + Prettyprint all known courses. """ - blocks = [] - cnt = 1 - for v in self.known_video_ids: - blocks.append(self._serialize_block("Video", v, cnt)) - cnt += 1 - for p in self.known_problem_ids: - blocks.append(self._serialize_block("Problem", p, cnt)) - cnt += 1 - for s in self.known_sequential_ids: - blocks.append(self._serialize_block("Sequential", s, cnt)) - cnt += 1 - blocks.append(self._serialize_course_block()) - - return blocks + for c in self.courses: + pprint.pprint(c) -class EventGenerator: +def generate_events(config, backend): """ - Generates a batch of random xAPI events based on the EVENT_WEIGHTS proportions. + Generate the actual events in the backend, using the given config. """ + setup_timing(config["log_dir"]) - known_actor_uuids = [] - known_courses = [] - known_orgs = ["openedX", "burritoX", "tacoX", "chipX", "salsaX", "guacX"] + print("Checking table existence and current row count in backend...") + backend.print_row_counts() + start = datetime.datetime.utcnow() - def __init__(self, batch_size, start_date, end_date): - self.batch_size = batch_size - self.start_date = start_date - self.end_date = end_date + with LogTimer("setup", "full_setup"): + with LogTimer("setup", "event_generator"): + event_generator = EventGenerator(config) + event_generator.dump_courses() - def get_batch_events(self): - """ - Create a batch size list of random events. + print("Inserting course metadata...") + with LogTimer("insert_metadata", "course"): + backend.insert_event_sink_course_data(event_generator.courses) - Events are from our EVENTS list, based on the EVENT_WEIGHTS proportions. - """ - events = choices(EVENTS, EVENT_WEIGHTS, k=self.batch_size) - return [e(self).get_data() for e in events] + print("Inserting block metadata...") + with LogTimer("insert_metadata", "blocks"): + backend.insert_event_sink_block_data(event_generator.courses) - def get_actor(self): - """ - Return a random actor. - """ - return _get_random_thing(self.known_actor_uuids) + print("Inserting user data...") + with LogTimer("insert_metadata", "user_data"): + backend.insert_event_sink_actor_data(event_generator.actors) - def _generate_random_course(self): - org = choice(self.known_orgs) - return RandomCourse(org, self.start_date, self.end_date) + insert_registrations(event_generator, backend) + insert_batches(event_generator, config["num_batches"], backend) - def get_course(self): - """ - Return a random course. - """ - return _get_random_thing( - self.known_courses, self._generate_random_course, one_in_range=10000 - ) + with LogTimer("batches", "total"): + print(f"Done! Added {config['num_batches'] * config['batch_size']:,} rows!") - def dump_courses(self): - """ - Prettyprint all known courses. - """ - for c in self.known_courses: - pprint.pprint(c) + end = datetime.datetime.utcnow() + print("Batch insert time: " + str(end - start)) + + backend.finalize() + backend.print_db_time() + backend.print_row_counts() + + end = datetime.datetime.utcnow() + print("Total run time: " + str(end - start)) + + +def insert_registrations(event_generator, lake): + """ + Insert all of the registration events. + """ + with LogTimer("enrollment", "get_enrollment_events"): + events = event_generator.get_enrollment_events() + + with LogTimer("enrollment", "insert_events"): + lake.batch_insert(events) + + print(f"{len(events)} enrollment events inserted.") + + +def insert_batches(event_generator, num_batches, lake): + """ + Generate and insert num_batches of events. + """ + for x in range(num_batches): + if x % 100 == 0: + print(f"{x} of {num_batches}") + lake.print_db_time() + + with LogTimer("batch", "get_events"): + events = event_generator.get_batch_events() + + with LogTimer("batch", "insert_events"): + lake.batch_insert(events) + + if x % 1000 == 0: + with LogTimer("batch", "all_queries"): + lake.do_queries(event_generator) + lake.print_db_time() + lake.print_row_counts() diff --git a/xapi_db_load/main.py b/xapi_db_load/main.py index 442b3a3..58636ae 100644 --- a/xapi_db_load/main.py +++ b/xapi_db_load/main.py @@ -1,212 +1,92 @@ """ Top level script to generate random xAPI events against various backends. """ +import click +import yaml -import datetime +from xapi_db_load.generate_load import generate_events +from xapi_db_load.utils import get_backend_from_config -import click -from xapi_db_load.backends import clickhouse_lake as clickhouse -from xapi_db_load.backends import csv -from xapi_db_load.backends import ralph_lrs as ralph -from xapi_db_load.generate_load import EventGenerator -from xapi_db_load.utils import LogTimer, setup_timing +def get_config(config_file): + """ + Wrap around config loading. + + We override this in tests so that we can use temp dirs for logs etc. + """ + with open(config_file, 'r') as y: + return yaml.safe_load(y) + + +@click.group() +def cli(): + """ + Top level group of command objects. + """ @click.command() @click.option( - "--backend", + "--config_file", + help="Configuration file.", required=True, - type=click.Choice( - ["clickhouse", "ralph_clickhouse", "csv_file"], - case_sensitive=True, - ), - help="Which backend to run against", -) -@click.option( - "--num_batches", - default=1, - help="Number of batches to run, num_batches * batch_size is the total rows", -) -@click.option( - "--batch_size", - default=10000, - help="Number of rows to insert per batch, num_batches * batch_size is the total rows", -) -@click.option( - "--drop_tables_first", - default=False, - help="If True, the target tables will be dropped if they already exist", -) -@click.option( - "--distributions_only", - default=False, - help="Just run distribution queries and exit", -) -@click.option( - "--start_date", - default=(datetime.date.today() - datetime.timedelta(days=365)).strftime( - "%Y-%m-%d" - ), - type=click.DateTime(formats=["%Y-%m-%d"]), - help="Create events starting at this date, default to 1 yr ago. ex: 2020-11-30" -) -@click.option( - "--end_date", - default=(datetime.date.today() + datetime.timedelta(days=1)).strftime( - "%Y-%m-%d" - ), - type=click.DateTime(formats=["%Y-%m-%d"]), - help="Create events ending at this date, default to tomorrow. ex: 2020-11-31" -) -@click.option("--db_host", default="localhost", help="Database host name") -@click.option("--db_port", help="Database port") -@click.option("--db_name", default="xapi", help="Database name") -@click.option("--db_username", help="Database username") -@click.option( - "--db_password", help="Password for the database so it's not stored on disk" -) -@click.option("--lrs_url", default="http://localhost:8100/", help="URL to the LRS, if used") -@click.option("--lrs_username", default="ralph", help="LRS username") -@click.option("--lrs_password", help="Password for the LRS") -@click.option( - "--csv_output_directory", - help="Directory where the output files should be written when using the csv backend.", - type=click.Path(exists=True, dir_okay=True, file_okay=False, writable=True) -) -@click.option( - "--log_dir", - help="The directory to log timing information to.", - type=click.Path(exists=True, dir_okay=True, file_okay=False, writable=True) + default="default_config.yaml", + type=click.Path( + exists=True, + dir_okay=False, + file_okay=True, + writable=False + ) ) -def load_db( - backend, - num_batches, - batch_size, - drop_tables_first, - distributions_only, - start_date, - end_date, - db_host, - db_port, - db_name, - db_username, - db_password, - lrs_url, - lrs_username, - lrs_password, - csv_output_directory, - log_dir, -): +def load_db(config_file): """ - Execute the database load. + Execute a database load by performing inserts. """ - start = datetime.datetime.utcnow() - - if start_date >= end_date: - raise click.UsageError("Start date must be before end date.") - - # Since we're accepting pw on input we need a way to "None" it. - if db_password == " ": - db_password = None - - if backend == "clickhouse": - lake = clickhouse.XAPILakeClickhouse( - db_host=db_host, - db_port=db_port, - db_username=db_username, - db_password=db_password, - db_name=db_name, - ) - elif backend == "ralph_clickhouse": - lake = ralph.XAPILRSRalphClickhouse( - db_host=db_host, - db_port=db_port, - db_username=db_username, - db_password=db_password, - db_name=db_name, - lrs_url=lrs_url, - lrs_username=lrs_username, - lrs_password=lrs_password, - ) - elif backend == "csv_file": - if not csv_output_directory: - raise click.UsageError( - "--csv_output_directory must be provided for this backend." - ) - lake = csv.XAPILakeCSV(output_directory=csv_output_directory) - else: - raise NotImplementedError(f"Unknown backend {backend}.") - - # Sets up the timing logger. Here to prevent creating log files when - # running --help or other commands. - setup_timing(log_dir) - - if distributions_only: - with LogTimer("distributions", "do_distributiuon"): - lake.do_distributions() - print("Done!") - return - - with LogTimer("setup", "full_setup"): - if drop_tables_first: - with LogTimer("setup", "drop_tables"): - lake.drop_tables() - - with LogTimer("setup", "create_tables"): - lake.create_tables() - - with LogTimer("setup", "event_generator"): - event_generator = EventGenerator( - batch_size=batch_size, - start_date=start_date, - end_date=end_date - ) - - insert_batches(event_generator, num_batches, lake) - - print("Inserting course metadata...") - with LogTimer("insert_metadata", "course"): - lake.insert_event_sink_course_data(event_generator.known_courses) - print("Inserting block metadata...") - with LogTimer("insert_metadata", "blocks"): - lake.insert_event_sink_block_data(event_generator.known_courses) - - with LogTimer("batches", "total"): - print(f"Done! Added {num_batches * batch_size:,} rows!") - - end = datetime.datetime.utcnow() - print("Batch insert time: " + str(end - start)) - - lake.print_db_time() - lake.print_row_counts() - lake.do_distributions() - - end = datetime.datetime.utcnow() - print("Total run time: " + str(end - start)) - - -def insert_batches(event_generator, num_batches, lake): + config = get_config(config_file) + backend = get_backend_from_config(config) + generate_events(config, backend) + + try_s3_load = "csv_load_from_s3_after" in config and config["csv_load_from_s3_after"] + + if config["backend"] == "csv_file" and try_s3_load: + print("Attempting to load to ClickHouse from S3...") + config["backend"] = "clickhouse" + ch_backend = get_backend_from_config(config) + ch_backend.load_from_s3(config["s3_source_location"]) + elif try_s3_load: + print("Backend is not 'csv_file', skipping load from S3.") + + print("Done.") + + +@click.command() +@click.option( + "--config_file", + help="Configuration file.", + required=True, + default="default_config.yaml", + type=click.Path( + exists=True, + dir_okay=False, + file_okay=True, + writable=False + ) +) +def load_db_from_s3(config_file): """ - Generate and insert num_batches of events. + Execute the database by importing existing files from S3. """ - for x in range(num_batches): - if x % 100 == 0: - print(f"{x} of {num_batches}") - lake.print_db_time() + config = get_config(config_file) - with LogTimer("batch", "get_events"): - events = event_generator.get_batch_events() + if "clickhouse" not in config["backend"]: + raise click.BadParameter("You must use a ClickHouse based backend to load from S3.") - with LogTimer("batch", "insert_events"): - lake.batch_insert(events) + backend = get_backend_from_config(config) + backend.load_from_s3(config["s3_source_location"]) - if x % 1000 == 0: - with LogTimer("batch", "all_queries"): - lake.do_queries(event_generator) - lake.print_db_time() - lake.print_row_counts() +cli.add_command(load_db) +cli.add_command(load_db_from_s3) if __name__ == "__main__": - load_db() # pylint: disable=no-value-for-parameter + cli() diff --git a/xapi_db_load/utils.py b/xapi_db_load/utils.py index 057438a..5acc463 100644 --- a/xapi_db_load/utils.py +++ b/xapi_db_load/utils.py @@ -6,9 +6,36 @@ import os from datetime import datetime +from xapi_db_load.backends import clickhouse_lake as clickhouse +from xapi_db_load.backends import csv +from xapi_db_load.backends import ralph_lrs as ralph + timing = logging.getLogger("timing") +class ConfigurationError(Exception): + """ + Exception raised by backends when a configuration file is invalid. + """ + + +def get_backend_from_config(config): + """ + Return an instantiated backend from the given config dict. + """ + backend = config["backend"] + if backend == "clickhouse": + lake = clickhouse.XAPILakeClickhouse(config) + elif backend == "ralph_clickhouse": + lake = ralph.XAPILRSRalphClickhouse(config) + elif backend == "csv_file": + lake = csv.XAPILakeCSV(config) + else: + raise NotImplementedError(f"Unknown backend {backend}.") + + return lake + + def setup_timing(log_dir): """ Set up the timing logger. diff --git a/xapi_db_load/xapi/xapi_forum.py b/xapi_db_load/xapi/xapi_forum.py new file mode 100644 index 0000000..59c4ad2 --- /dev/null +++ b/xapi_db_load/xapi/xapi_forum.py @@ -0,0 +1,92 @@ +""" +Fake xAPI statements for various forum events. +""" +import json +from uuid import uuid4 + +from .xapi_common import XAPIBase + + +class BaseForum(XAPIBase): + """ + Base xAPI class for forum events. + """ + + def get_data(self): + """ + Generate and return the event dict, including xAPI statement as "event". + """ + # We generate registration events for every course and actor as part + # of startup, but also randomly through the events. + + event_id = str(uuid4()) + course = self.parent_load_generator.get_course() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) + post_id = course.get_random_forum_post_id() + + e = self.get_randomized_event( + event_id, actor_id, course, post_id, emission_time + ) + + return { + "event_id": event_id, + "verb": self.verb, + "actor_id": actor_id, + "org": course.org, + "course_run_id": course.course_url, + "emission_time": emission_time, + "event": e, + } + + def get_randomized_event(self, event_id, account, course, post_id, create_time): + """ + Given the inputs, return an xAPI statement. + + Currently all forum events are treated the same, so we're just creating + new posts. + """ + event = { + "id": event_id, + "actor": { + "objectType": "Agent", + "account": {"homePage": "http://localhost:18000", "name": account}, + }, + "context": { + "contextActivities": { + "parent": [ + { + "id": course.course_url, + "objectType": "Activity", + "definition": { + "name": {"en-US": "Demonstration Course"}, + "type": "http://adlnet.gov/expapi/activities/course", + }, + } + ] + }, + "extensions": { + "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", + "https://w3id.org/xapi/openedx/extensions/session-id": "054c9ddcb76d2096f862e66bda3bc308", + "https://w3id.org/xapi/acrossx/extensions/type": "discussion" + } + }, + "object": { + "definition": { + "type": "http://id.tincanapi.com/activitytype/discussion" + }, + "id": post_id, + "objectType": "Activity" + }, + "timestamp": create_time.isoformat(), + "verb": {"display": {"en": self.verb_display}, "id": self.verb}, + "version": "1.0.3", + } + + return json.dumps(event) + + +class PostCreated(BaseForum): + verb = "https://w3id.org/xapi/acrossx/verbs/posted" + verb_display = "posted" diff --git a/xapi_db_load/xapi/xapi_grade.py b/xapi_db_load/xapi/xapi_grade.py index b7081ff..296b5ae 100644 --- a/xapi_db_load/xapi/xapi_grade.py +++ b/xapi_db_load/xapi/xapi_grade.py @@ -21,9 +21,10 @@ def get_data(self): Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() - emission_time = course.get_random_emission_time() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) e = self.get_randomized_event(event_id, actor_id, course, emission_time) @@ -49,7 +50,8 @@ def get_randomized_event(self, event_id, account, course, create_time): }, "context": { "extensions": { - "https://github.com/openedx/event-routing-backends/blob/master/docs/xapi-extensions/eventVersion.rst": "1.0" # pylint: disable=line-too-long + "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" } }, "object": { @@ -83,9 +85,10 @@ def get_data(self): Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() - emission_time = course.get_random_emission_time() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) e = self.get_randomized_event(event_id, actor_id, course, emission_time) return { diff --git a/xapi_db_load/xapi/xapi_hint_answer.py b/xapi_db_load/xapi/xapi_hint_answer.py index 7dbcddc..5c3043d 100644 --- a/xapi_db_load/xapi/xapi_hint_answer.py +++ b/xapi_db_load/xapi/xapi_hint_answer.py @@ -23,10 +23,11 @@ def get_data(self): Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) problem_id = course.get_problem_id() - emission_time = course.get_random_emission_time() e = self.get_randomized_event( event_id, actor_id, course, problem_id, emission_time @@ -84,8 +85,9 @@ def get_randomized_event(self, event_id, account, course, problem_id, create_tim ] }, "extensions": { - "https://github.com/openedx/event-routing-backends/blob/master/docs/xapi-extensions/eventVersion.rst": "1.0" # pylint: disable=line-too-long - }, + "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" + } }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, diff --git a/xapi_db_load/xapi/xapi_navigation.py b/xapi_db_load/xapi/xapi_navigation.py index 4556a38..483e9e4 100644 --- a/xapi_db_load/xapi/xapi_navigation.py +++ b/xapi_db_load/xapi/xapi_navigation.py @@ -28,9 +28,10 @@ def get_data(self): Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() - emission_time = course.get_random_emission_time() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) from_loc = self.from_loc or course.get_random_nav_location() to_loc = self.to_loc or course.get_random_nav_location() @@ -76,8 +77,9 @@ def get_randomized_event( ] }, "extensions": { - "https://github.com/openedx/event-routing-backends/blob/master/docs/xapi-extensions/eventVersion.rst": "1.0", # pylint: disable=line-too-long - }, + "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" + } }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, diff --git a/xapi_db_load/xapi/xapi_problem.py b/xapi_db_load/xapi/xapi_problem.py index e5383c3..3527b14 100644 --- a/xapi_db_load/xapi/xapi_problem.py +++ b/xapi_db_load/xapi/xapi_problem.py @@ -22,10 +22,11 @@ def get_data(self): Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id + emission_time = course.get_random_emission_time(enrolled_actor) problem_id = course.get_problem_id() - emission_time = course.get_random_emission_time() e = self.get_randomized_event( event_id, actor_id, course.course_url, problem_id, emission_time @@ -58,13 +59,31 @@ def get_randomized_event( } } - success = random.choice([True, False]) - response = "A correct answer" if success else "An incorrect answer" + response_options = [ + ("A correct answer", True), + ("An incorrect answer", False), + # FIXME: These aren't serializing correctly + # ('["A correct answer 1", "A correct answer 2"]', True), + # ('["A correct answer 1", "An incorrect answer 2"]', False), + ] + + response, success = random.choice(response_options) + attempts = random.randrange(1, 10) + + max_score = random.randint(1, 100) + raw_score = random.randint(0, max_score) + scaled_score = raw_score / max_score + score_obj = { + "scaled": scaled_score, + "raw": raw_score, + "min": 0.0, + "max": max_score + } server_object = { "object": { "definition": { - "extensions": {"http://id.tincanapi.com/extension/attempt-id": 10}, + "extensions": {"http://id.tincanapi.com/extension/attempt-id": attempts}, "description": { "en-US": "Add the question text, or prompt, here. This text is required." }, @@ -76,7 +95,7 @@ def get_randomized_event( }, "result": { "response": response, - "score": {"max": 1, "min": 0, "raw": 0, "scaled": 0}, + "score": score_obj, "success": success, }, } diff --git a/xapi_db_load/xapi/xapi_registration.py b/xapi_db_load/xapi/xapi_registration.py index 76aa010..948b957 100644 --- a/xapi_db_load/xapi/xapi_registration.py +++ b/xapi_db_load/xapi/xapi_registration.py @@ -2,6 +2,7 @@ Fake xAPI statements for various registration events. """ import json +from random import choice from uuid import uuid4 from .xapi_common import XAPIBase @@ -12,14 +13,22 @@ class BaseRegistration(XAPIBase): Base xAPI class for registration events. """ - def get_data(self): + def get_data(self, course=None, enrolled_actor=None): """ Generate and return the event dict, including xAPI statement as "event". """ + # We generate registration events for every course and actor as part + # of startup, but also randomly through the events, so sometimes we will + # have a course and actor, other times not. + if not course: + course = self.parent_load_generator.get_course() + + if not enrolled_actor: + enrolled_actor = course.get_enrolled_actor() + + actor_id = enrolled_actor.actor.id event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() - course = self.parent_load_generator.get_course() - emission_time = course.get_random_emission_time() + emission_time = course.get_random_emission_time(enrolled_actor) e = self.get_randomized_event( event_id, actor_id, course.course_url, emission_time @@ -39,6 +48,7 @@ def get_randomized_event(self, event_id, account, course_locator, create_time): """ Given the inputs, return an xAPI statement. """ + enrollment_mode = choice(("audit", "honor", "verified")) event = { "id": event_id, "actor": { @@ -47,13 +57,14 @@ def get_randomized_event(self, event_id, account, course_locator, create_time): }, "context": { "extensions": { - "https://github.com/openedx/event-routing-backends/blob/master/docs/xapi-extensions/eventVersion.rst": "1.0" # pylint: disable=line-too-long + "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" } }, "object": { "definition": { "extensions": { - "https://w3id.org/xapi/acrossx/extensions/type": "audit" + "https://w3id.org/xapi/acrossx/extensions/type": enrollment_mode }, "name": {"en": "Demonstration Course"}, "type": "http://adlnet.gov/expapi/activities/course", diff --git a/xapi_db_load/xapi/xapi_video.py b/xapi_db_load/xapi/xapi_video.py index babef41..d7884e5 100644 --- a/xapi_db_load/xapi/xapi_video.py +++ b/xapi_db_load/xapi/xapi_video.py @@ -2,6 +2,7 @@ Fake xAPI statements for various video events. """ import json +from random import randrange from uuid import uuid4 from .xapi_common import XAPIBase @@ -13,16 +14,20 @@ class BaseVideo(XAPIBase): """ enabled = False + caption = False + has_event_time = False + has_time_from_to = False def get_data(self): """ Generate and return the event dict, including xAPI statement as "event". """ event_id = str(uuid4()) - actor_id = self.parent_load_generator.get_actor() course = self.parent_load_generator.get_course() + enrolled_actor = course.get_enrolled_actor() + actor_id = enrolled_actor.actor.id video_id = course.get_video_id() - emission_time = course.get_random_emission_time() + emission_time = course.get_random_emission_time(enrolled_actor) e = self.get_randomized_event( event_id, actor_id, course, video_id, emission_time @@ -43,6 +48,15 @@ def get_randomized_event(self, event_id, account, course, video_id, create_time) """ Given the inputs, return an xAPI statement. """ + video_length = 195.0 + + if self.has_event_time: + video_event_time = float(randrange(0, 195)) + + if self.has_time_from_to: + video_event_time_from = float(randrange(0, 195)) + video_event_time_to = float(randrange(0, 195)) + event = { "id": event_id, "actor": { @@ -64,7 +78,7 @@ def get_randomized_event(self, event_id, account, course, video_id, create_time) }, "extensions": { "https://github.com/openedx/event-routing-backends/blob/master/docs/xapi-extensions/eventVersion.rst": "1.0", # pylint: disable=line-too-long - "https://w3id.org/xapi/video/extensions/length": 195.0, + "https://w3id.org/xapi/video/extensions/length": video_length, }, }, "object": { @@ -75,14 +89,21 @@ def get_randomized_event(self, event_id, account, course, video_id, create_time) "objectType": "Activity", }, "result": { - "extensions": {"https://w3id.org/xapi/video/extensions/time": 0.033} + "extensions": {} }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, "version": "1.0.3", } - if self.verb_display == "interacted": + if self.has_event_time: + event["result"]["extensions"]["https://w3id.org/xapi/video/extensions/time"] = video_event_time + + if self.has_time_from_to: + event["result"]["extensions"]["https://w3id.org/xapi/video/extensions/time-from"] = video_event_time_from + event["result"]["extensions"]["https://w3id.org/xapi/video/extensions/time-to"] = video_event_time_to + + if self.caption: event["result"]["extensions"]["https://w3id.org/xapi/video/extensions/cc-enabled"] = self.enabled return json.dumps(event) @@ -96,22 +117,26 @@ class LoadedVideo(BaseVideo): class PlayedVideo(BaseVideo): verb = "https://w3id.org/xapi/video/verbs/played" verb_display = "played" + has_event_time = True # TODO: These four technically need different structures, though we're not using them now. Update! class StoppedVideo(BaseVideo): verb = "http://adlnet.gov/expapi/verbs/terminated" verb_display = "terminated" + has_event_time = True class PausedVideo(BaseVideo): verb = "https://w3id.org/xapi/video/verbs/paused" verb_display = "paused" + has_event_time = True class PositionChangedVideo(BaseVideo): verb = "https://w3id.org/xapi/video/verbs/seeked" verb_display = "seeked" + has_time_from_to = True class CompletedVideo(BaseVideo): @@ -119,12 +144,24 @@ class CompletedVideo(BaseVideo): verb_display = "completed" +# Currently closed captions and transcripts use the same output events, so +# this technically covers both class TranscriptEnabled(BaseVideo): + """ + TranscriptEnabled event. + + This comment is needed for linting purposes. + """ + verb = "http://adlnet.gov/expapi/verbs/interacted" verb_display = "interacted" + caption = True enabled = True + has_event_time = True class TranscriptDisabled(BaseVideo): verb = "http://adlnet.gov/expapi/verbs/interacted" verb_display = "interacted" + caption = True + has_event_time = True