CSV import of large datasets can be very slow and memory-demanding.
The idea is to import data using Akeneo REST API. The benefit can be achieved by parallelizing API calls. Multi-core processors can handle more requests than single-core ones. However, even for single-core CPUs, code running across multiple threads can still be faster than single-threaded code.
Additionally to it, REST API calls don't have memory issues of long-running PHP processes that use Doctrine.
- Generate product data and publish it to RabbitMQ.
- Configure queue consumers to send product data to Akeneo REST API.
Publisher responsibilities:
-
Fetch product data from a data provider. Examples of data providers: product generator, API of another system (e.g. legacy PIM), file readers of a specific file format.
The implemented data provider for this demo application: FakeDataProvider (partly implemented).
-
Format product data to the standard format.
-
Pack data to batches of the given size (100 items).
-
Publish data to RabbitMQ.
Example of a message published in a product data queue:
{"identifier":"product-0","family":"clothing","enabled":false,"parent":"product-model-0","groups":[],"categories":[],"values":{"size":[{"locale":null,"scope":null,"data":"s"}]},"associations":[],"created":"2018-10-21T21:23:03+00:00","updated":"2018-10-21T21:23:03+00:00"}
{"identifier":"product-1","family":"clothing","enabled":false,"parent":"product-model-1","groups":[],"categories":[],"values":{"size":[{"locale":null,"scope":null,"data":"m"}]},"associations":[],"created":"2018-10-21T21:23:03+00:00","updated":"2018-10-21T21:23:03+00:00"}
Implementation: AmqpMessagePublisher
Consumers receive messages from RabbitMQ by subscription.
We know, that PHP is not the best language for the development of long-running processes. Moreover, consumers don't need to implement a specific business logic, so, it makes sense to use existing tools instead of inventing the wheel.
One of the widely used utilities of this kind is RabbitMQ cli consumer.
Usage:
rabbitmq-cli-consumer \
--url amqp://guest:guest@rabbitmq:5672 \
--queue-name create_product_queue \
--executable "./bin/console poc:product:update --type=product" \
--strict-exit-code \
--verbose
Worker responsibilities:
-
Send product batches to the PATCH products endpoint of the Akeneo REST API
-
Return correct result of processing to consumer to acknowledge, reject or re-queue messages.
Implementation: ApiProductBatchUpdater
I was running the application in my local docker setup (2 CPUs, 5 GB RAM).
- Dataset: 1000 product models, 5000 products.
- Consumers: 1 consumer for product model data, 3 consumers for product data.
- Total time: 6 min.
TO DO: compare execution time and memory consumption with CSV import of the very same dataset.
- Messages in RabbitMQ are stored in a format fully compatible with the PATCH API endpoint. Current implementation requires to decode messages for Akeneo PHP API client. The idea is to replace PHP API client with a custom solution in order to avoid decoding/encoding of json messages.
- Use
supervisord
and experiment with different numbers of consumers.