K6: Clustering and distributed execution

50

This is something that isn't top priority at the moment, but it's going to take a lot of design work, so I'd like to get the ball rolling on the actual planning.

My current idea is to have a k6 hive command (name subject to change), which hooks up to etcd - a lovely piece of software that can handle most of the heavy lifting around clustering, and is also the backbone that makes among other things Kubernetes tick.

Each instance registers itself, along with how many VUs it can handle, exposes an API to talk to the cluster, and triggers a leader election. The information registered in etcd might have a structure like:

  • /loadimpact/k6/nodes/nodeA - node data (JSON)
  • /loadimpact/k6/nodes/nodeB - node data (JSON)

Running a test on the cluster is a matter of calling k6 run --remote=https://a.node.address/ script.js. This causes it to, instead of running the test locally, roll up all the data and files needed and push them to the cluster, where they're stored in etcd - available from each node.

  • /loadimpact/k6/test - test data (JSON)
  • /loadimpact/k6/test/src/... - root for the pushed filesystem

When test data is loaded, each VU instantiates an engine and its maximum number of VUs right away, and watches its own registry information for changes. The elected leader then takes care of patching other nodes to distribute VUs evenly across the cluster.

liclac picture liclac  ·  7 Mar 2017

Most helpful comment

19

We are currently finishing a big refactoring of the core of how script execution works in k6, which would be foundational for native distributed execution, among other thigs. You can see some details https://github.com/loadimpact/k6/pull/1007 (specifically, the execution segments part of it, https://github.com/loadimpact/k6/issues/997).

This would allow you to partition a load test among however many instances you require, without any synchronization between them. For example, if you want to split a test between 3 instances, you would be able to do something like this when #1007 is merged:

k6 run --execution-segment "0:1/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "1/3:2/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "2/3:1" --execution-segment-sequence "0,1/3,2/3,1" my-script.js

Each instance will execute only its own piece of the puzzle. To start the tests synchronously, you can use the --paused flag and the REST API to start the test run. Though for now you'd still need a centralized metrics store, if you want to explore the generated metrics, and things like thresholds won't work in a distributed manner. To have that working, we'd still need to implement https://github.com/loadimpact/k6/issues/763 and some synchronization between the instances...

na-- picture na--  ·  27 Apr 2020

All comments

0

WRT #202 / request for comments:

I don't have direct experience with etcd so i don't have an opinion there but the notion of using existing, proven software is clearly sound so i'd welcome that.

My initial thought is, how would someone know how many VU's a machine can handle? If there's an automated estimation based on system resources (though obv. e.g. a CPU core on AWS is definitely not === a CPU core on dedicated hardware) then that'd help to at least provide consistency which would be good. I'm aware though that it'd quite easy to overload a load generator with work and thus skew its output as it would lack the system resources to measure accurately. However it's implemented, i would imagine it's going to be necessary to allow users to set/amend the VU capability and a good user guide would help a lot - i.e. defining a way in which users can calculate/estimate - that might be the best way to start actually, keeping it simple and iterating/adding from there.

neilstuartcraig picture neilstuartcraig  ·  8 May 2017
0

Honestly, your best shot is probably trial and error. The limiting factor is not typically CPU power, but rather local socket usage, and to some lesser extent, RAM usage, both of which vary slightly between scripts. A good start would be just split your desired number of VUs across as many hosts as you want and seeing if it flies or not.

liclac picture liclac  ·  9 May 2017
0

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

ghost picture ghost  ·  10 May 2017
0

Thanks for this, I am very interested!

I was wondering if there would be a way to avoid adding a new service to the pool.

The need of extra shared meta date is going to be there, because you would probably want to direct the load test output to a single InfluxDB instance, so maybe we can save this kind of metadata directly there?

I know that this would force folks to stick with InfluxDB, but if we'd use etcd people would in any case need to custom tailor something to collect and aggregate results.

arichiardi picture arichiardi  ·  30 Nov 2017
11

@arichiardi I think it's more important that we look at how we can best implement this, using all available tools, rather than looking at how to minimise dependencies right out of the bat. I'm not saying we should introduce dependencies for the sake of it, but we want this done right.

The current requirements for this to be implemented is as follows:

Prerequisite: Leader assignment

Most of the below requirements have one prerequisite: we need a central point to make all decisions from. The leader doesn't need a lot of processing power, it just needs to keep an eye on things, so to speak.

  1. The initiating client acts as the brain.

    This is the absolute simplest solution, with the drawback that the test will have to be aborted if the client loses connection.

  2. Dedicated master, á la kubernetes.

    Second simplest. Needs a coordinating process that can communicate with all instances, either directly or through an indirection layer (eg. etcd, redis, etc). Because this can be expected to run in the cluster, and we have a persistence mechanism for the test, we could technically recover from a master failure/network hiccup by just skipping the chunk of the timeline that never passed.

  3. Leader election.

    This is fortunately not something we have to implement the algorithm for ourselves (etcd and similar provide it with a single function call), but it does add a hard dependency on something that can provide this. If one of the instances in a cluster can be dynamically elected as the acting master, it would theoretically simplify deployment, I just have a bad feeling it might be opening a can of worms of synchronisation bugs.

Spreading VUs across instances.

The algorithm for this could simply be to spread VUs evenly across all available instances, respecting their caps. We could possibly do some weighing, eg. between an instance with max 1000 VUs and one with max 2000 VUs, the latter could get 2x as many VUs allocated to it.

Possible implementations I can see:

  1. All instances hold persistent connections to the master.

    I don't like this. Connections break. It's real simple though.

  2. Key-Value Observing.

    All instances register themselves in a control plane of some kind (etcd, consul, redis), then watch their own keyspaces. The master node updates them. Reliability is offloaded to the control plane, we don't have to worry about it.

Central execution of thresholds, from a data source.

This would be fairly simple using something like InfluxDB; we can parse threshold snippets for the variables they refer to (there's some code for that already), then query them out of the database, using the starting timestamp of the test as delimiter.

We could do something with shipping samples back to the master, but that feels... a little silly.

Distributed rate limiting.

The --rps flag, and possible future rate limits, need to be distributed to work properly.

Distributed data storage.

We need to be able to store two kinds of things:

  • Archive data - script files, static files, options
  • Runtime data like the fixed seed used for the init context, setup data (#194), etc

This can be anything that can store keys and values of arbitrary size.

liclac picture liclac  ·  20 Dec 2017
0

Have you considered implementing something similar to what was done for Locust?

They have a master/slave architecture where the synchronization happens via ZMQ (TCP), which is lightweight enough.

One advantage, in this case, is that there is no need for introducing a hard dependency. The synchronization master/slave can be implemented via ZMQ, HTTP or whatever network protocol you might consider.

IMHO, the only disadvantage that Locust implementation has is the fact that it is a stateful system, where the master must always be started first and can't recover if a slave disappears and then comes back.

I would rather see a stateless system that can handle connectivity issues gracefully.

albertowar picture albertowar  ·  16 Oct 2018
4

@coderlifter, thanks, we still haven't finalized the k6 distributed execution design yet, so we'll definitely consider this approach when we get to it. We'll post a final design/RFC here when we start implementing this, so it can be discussed by any interested people.

na-- picture na--  ·  19 Oct 2018
0

Any progress on internal discussions? This is a killer feature which is pushing users more towards python at the moment.

GTB3NW picture GTB3NW  ·  14 Jan 2019
6

@GTB3NW, sorry, we still haven't started specifically implementing this functionality yet, the next major feature we're currently working on is the arrival-rate based execution support, i.e. being able to schedule the execution in terms of iterations (requests) per second. As a part of the refactoring we're doing for that, we'll also improve some k6 internals in a way that would facilitate the easier implementation of the distributed execution as well, so we're slowly getting to that point, but we're not there yet.

na-- picture na--  ·  15 Jan 2019
1

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

You should be able to set up n(ip) * ~65K(ports) sockets, since a source of a packet is src ip + src port.
You need to create virtual IP addresses for that.

efology picture efology  ·  25 Apr 2019
6

Any updates on this issue? Very keen to be able to run K6 in a distributed manner

qcastel picture qcastel  ·  25 Apr 2020
19

We are currently finishing a big refactoring of the core of how script execution works in k6, which would be foundational for native distributed execution, among other thigs. You can see some details https://github.com/loadimpact/k6/pull/1007 (specifically, the execution segments part of it, https://github.com/loadimpact/k6/issues/997).

This would allow you to partition a load test among however many instances you require, without any synchronization between them. For example, if you want to split a test between 3 instances, you would be able to do something like this when #1007 is merged:

k6 run --execution-segment "0:1/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "1/3:2/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "2/3:1" --execution-segment-sequence "0,1/3,2/3,1" my-script.js

Each instance will execute only its own piece of the puzzle. To start the tests synchronously, you can use the --paused flag and the REST API to start the test run. Though for now you'd still need a centralized metrics store, if you want to explore the generated metrics, and things like thresholds won't work in a distributed manner. To have that working, we'd still need to implement https://github.com/loadimpact/k6/issues/763 and some synchronization between the instances...

na-- picture na--  ·  27 Apr 2020
0

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

@ghost @efology
It's actually possible to create far more connections than 64k per client, because the limitation for TCP connections is on the tuple of:

{local IP address, local port, remote IP address, remote  port}

https://tools.ietf.org/html/bcp156#section-2.2

The local IP address is likely singular, and the remote port is likely constant.
The local port is probably limited to ~64,511 (assuming you expand the defaults but still exclude 0-1024).
The remote IP address of the target may not be singular and is particularly likely to resolve to multiple IPs if it is a redundant system.

e.g. Just looking at the public hostname of one of my services hosted by AWS, the DNS resolves to 4 IP addresses. In theory one k6 client machine configured to allow 64,511 ephemeral ports per remote IP would be able to create 258,044 concurrent connections to it.

As you say @efology you could also configure a client with multiple IPs and multiply the potential connections that way.

Particularly if you want to load test a web socket back end, where you have large numbers of idle connections this could be useful.

You might start to hit other limitations like CPU and memory at those higher connection counts too, depending on the internals of how K6 is managing connections.

luketn picture luketn  ·  8 Aug 2020
1

Side note to this: As a short-term stopgap solution, you can squeeze more sockets out of a machine by load balancing across multiple IPs, subject to your network topology allowing this. Go actually makes this fairly trivial to implement, just set net.Dialer.LocalAddr.

This is trivial in IPv6 setups that assign prefixes to machines, eg. my laptop currently has a /64 subset of this flat's /48. For IPv4 networks, it more commonly takes some fiddling with your interface configuration and adding discrete IPs - but you can query it all the same, with net.Interface.Addrs().

(* Neither this nor distributed execution helps if you're trying to load test through a NAT with only a single public IP between you and your destination host; that's your bottleneck in that case.)

liclac picture liclac  ·  8 Aug 2020
3

@liclac, This is what is used in all the multi NIC implementations, one of which will hopefully get in v0.28.0 ;). And it really does work and if you have multiple IPs/NICs

MStoykov picture MStoykov  ·  10 Aug 2020
0

Here is an idea:

what does "clustering" mean for k6? As far as I can tell, there are two types of with in k6:

  • distribute jobs across a set of executors
  • report metrics back

I'm not intimately familiar with how those are implemented in k6 (and if those are the only tasks), but they don't seem to require an active cluster membership.

For distributing load tasks, something like this could work quite well: https://www.temporal.io/

sagikazarmark picture sagikazarmark  ·  11 Oct 2020
0

Any updates on this issue? This is a killer feature for us.

li-zhixin picture li-zhixin  ·  28 Jan 2021
1

@li-zhixin, as evidenced by https://github.com/loadimpact/k6/issues/140#issuecomment-619760370, when we have updates to give, we share them in the issue :wink: Now that we have execution segments, you can launch distributed tests with the minimal amount of synchronization work described in that comment, though, unfortunately, without thresholds and a centralized summary. You can use an external output for those, of course, but it's certainly not a turnkey solution.

Looking at the thread afterwards, I guess it's worth mentioning that the multi-NIC support landed in k6 v0.29.0 instead of k6 v0.28.0.

For now, you can follow https://github.com/loadimpact/k6/issues/763, since it's the next prerequisite on the list, before we can properly work on the missing synchronization parts of this issue.

na-- picture na--  ·  28 Jan 2021
3

I guess we should also mention that @simskij build https://github.com/k6io/operator which is a way to run k6 tests inside of kubernetes, again with the lack of thresholds and summary.

MStoykov picture MStoykov  ·  28 Jan 2021