Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Points loss using batch processing #163

Closed
FlavioF opened this issue Apr 19, 2016 · 19 comments
Closed

Points loss using batch processing #163

FlavioF opened this issue Apr 19, 2016 · 19 comments

Comments

@FlavioF
Copy link

FlavioF commented Apr 19, 2016

We are using influxdb java client 2.2 (this happens in 2.1 too) and we are experiencing some data loss.

// creating the influxdb connection
InfluxDB connection = InfluxDBFactory.connect(this.influxdbHostUrl, this.influxdbUser, this.influxdbPwd) .enableBatch(5000, 30, TimeUnit.SECONDS);

//..... a lot of other stuff

// sending a lot of data 
log.info("Writing new serie {}", data);
connection.write(Constants.STATSLOG_DB_NAME,
            "default",
            Point.measurement(s.getKey())
                .time(s.getTimeInMillis(), TimeUnit.MILLISECONDS)
                .field("value", data.getMessage().getValue())
                .tag("type", data.getMessage().getType().name())
                .build());

In the above example, when we send like (lets say) 1k points of the same measurement, we get 1k log entries however some times we only have 999 points to that measurement in influxdb.
There isn't any errors in influxdb log.

Can it be anything related to java client batch processing?

I am happy to help debugging the problem or event fixing it if needed.

Thank you in advance.

@andrewdodd
Copy link
Contributor

andrewdodd commented Apr 19, 2016

Yes, the current BatchProcessor implementation will silently drop batched points if they do not send successfully. Would you be willing to:
a) Test the changes from the #108 PR branch? OR
b) Test the master branch soon after I merge the #108 PR into the master branch?

@FlavioF
Copy link
Author

FlavioF commented Apr 19, 2016

Yes, I can test it. No problem. Tomorrow I will test the PR and let you know the result of it.

@andrewdodd
Copy link
Contributor

Ha! Great news. I've been hoping someone (other than me) can see if works before I commit to merging it.

@FlavioF
Copy link
Author

FlavioF commented Apr 20, 2016

I run my application using your patch and there is still data loss.
Furthermore, I added a comment to a line in InfluxDBImpl of your patch. Take a look into it.

@andrewdodd
Copy link
Contributor

@FlavioF thanks for the comment, you are right that the default there should be null instead of 0. I will change it.

Can I enquire, are you running with a capacity limit or not on your batch processor? (I have really only used the capacity limited form, as i am uncomfortable with the unbounded capacity/capacity-based-on-JVM-memory-limits option)

@FlavioF
Copy link
Author

FlavioF commented Apr 20, 2016

I run the tests without a capacity limit. Do you want me to run it with a limit?

@andrewdodd
Copy link
Contributor

It would be great if you could.

(Just so we are on the same page, if you are running into a situation where there is a JVM limit you are going to lose data somewhere. There is nothing that we can do about that).

@FlavioF
Copy link
Author

FlavioF commented Apr 20, 2016

Ok, I get it. I will retry the test with a limit tomorrow morning.

@FlavioF
Copy link
Author

FlavioF commented Apr 21, 2016

Sorry, will test it only on next Tuesday.

@larrywalker
Copy link

Did you guys make any progress on this last week, I was away for a few days.

@larrywalker
Copy link

I am in a good spot to do some testing with this over the next few days. How is the patch looking?

@FlavioF
Copy link
Author

FlavioF commented May 11, 2016

No progress here sorry. I had no time for more tests in the past days. I will back to it ASAP.

@andrewdodd
Copy link
Contributor

andrewdodd commented May 11, 2016

@larrywalker I have been running this patch for 9 months or so, but have been waiting to get someone else to see if it works for them.

My use case is configured with a limited buffering capacity (100,000 points I think, or maybe 500k) and will discard the 'oldest' buffered points when the limit is reached.

I'm almost about to merge this PR in to the master branch anyway (I'm sick of having it unmerged and I don't think it is any worse than the existing; but it could be a little more dangerous, hence my delay).

Any feedback would be great.

@FlavioF thanks!

@jazdw
Copy link

jazdw commented May 13, 2016

@andrewdodd are you the maintainer now? To be clear PR #108 implements a circular buffer to retain x amount of data points when the connection is lost?

How do you configure the amount of points to hang onto?

@andrewdodd
Copy link
Contributor

andrewdodd commented May 13, 2016

Hi @jazdw, I am 'also' a maintainer now.

I guess one of the config options from PR #108 turns the buffer into a circular buffer (DROP_OLDEST).

Unfortunately the javadoc is not as clear as it could be, but the enableBatch() method with lots of parameters has an explanation. See here

In general, there are a few options for configuration:

  • The overall capacity of the buffer (either a number or JVM memory-bound)
  • The desired behaviour when the 'buffer is full'.
  • Options for triggering a batched write (i.e. flushing the buffer):
    • Time based trigger (i.e. flush at least every N seconds, regardless of how many items buffered)
    • Number of actions trigger (i.e. if N points are buffered quickly flush them, regardless of the time-based trigger)

There is also a slightly confusing config option maxBatchWriteSize. This is used to control the max number of points that are sent in one batch.

An example config might be:

  • Capacity = 100
  • Flush interval = 5 secs
  • Flush actions = 20
  • Behaviour = DISCARD_OLDEST
  • Max batch write size = 50

This would result in the following behaviours:

  • Every 5 secs, we would try to flush everything in the buffer
  • If we got 20+ points written quickly (i.e. faster than the 5 sec cadence), we would do a flush.
  • If 1 point per sec was being written and the connection was down for 1 minute - we would have buffered 60 points AND would have tried to flush unsuccessfully a number of times. IF the connection came back, we would write 50 points in the first batch.
  • If 1 point per sec was being written and the connection was down for 2 minutes - we would have buffered 100 points. Each time a new point is added, we would discard the oldest one.

@Daniel700
Copy link

Hi,
can you check if PR #177 works for you?
I think the data loss is a problem if you send Field values with certain strings

@csokol
Copy link
Contributor

csokol commented Dec 21, 2016

Any plans on working on this?

I've created a simple main to check if batching is suitable for our use and it doesn't seem reliable:

public class Foo {
	public static void main(String[] args) throws InterruptedException {
		InfluxDB influx = InfluxDBFactory.connect("http://127.0.0.1:8086", "root", "");
		influx.enableBatch(2000, 1, SECONDS);

		for (int i = 0; i < 10000; i++) {
			Point point = Point.measurement("bug")
					.time(System.currentTimeMillis(), MILLISECONDS)
					.addField("value", 1)
					.build();

			influx.write("test", "autogen", point);
		}

		Thread.sleep(2000);
		influx.close();
	}
}

When I check the database after running this code, I see there's a lot of data lost:

> select count(*) from bug
name: bug
time	count_value
----	-----------
0	42

I'm considering not using this feature in production and instead create my own implementation of batching. What do you think? Is this feature production ready? Maybe it should be deprecated or at least have a big warning in the readme mentioning that this is not reliable.

@csokol
Copy link
Contributor

csokol commented Dec 21, 2016

My example didn't make any sense, sorry. Since it was using the current timestamp, most of the measurements were obviously with the same timestamp so they were overridden. Here's a realistic example which is working as expected:

public class Foo {
	public static void main(String[] args) throws InterruptedException {
		InfluxDB influx = InfluxDBFactory.connect("http://127.0.0.1:8086", "root", "");
		influx.enableBatch(100, 1, SECONDS);
		Random random = new Random();

		for (int i = 0; i < 10000; i++) {

			Point point = Point.measurement("bug")
					.time(i, MILLISECONDS)
					.addField("value", 1)
					.build();

			influx.write("test", "autogen", point);
		}

		Thread.sleep(2000);
		influx.close();
	}
}

@majst01
Copy link
Collaborator

majst01 commented Dec 21, 2016

nice to see, i will close this again.

@majst01 majst01 closed this as completed Dec 21, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants