From 9da3171853adcbb604cbdd73fa17d54176d4ef7a Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Fri, 20 May 2022 16:22:04 -0400 Subject: [PATCH 01/18] Intial stuff for kafka source and sink --- Cargo.lock | 617 ++++++++++++--------- Cargo.toml | 2 + rust-connectors/common/src/opt.rs | 13 +- rust-connectors/sinks/dynamodb/src/main.rs | 2 +- rust-connectors/sinks/kafka/CHANGELOG.md | 3 + rust-connectors/sinks/kafka/Cargo.toml | 20 + rust-connectors/sinks/kafka/README.md | 10 + rust-connectors/sinks/kafka/src/main.rs | 70 +++ rust-connectors/sinks/slack/src/main.rs | 2 +- 9 files changed, 469 insertions(+), 270 deletions(-) create mode 100644 rust-connectors/sinks/kafka/CHANGELOG.md create mode 100644 rust-connectors/sinks/kafka/Cargo.toml create mode 100644 rust-connectors/sinks/kafka/README.md create mode 100644 rust-connectors/sinks/kafka/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index cc7fc86d..5b19a80c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -245,9 +245,9 @@ dependencies = [ [[package]] name = "async-process" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" +checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" dependencies = [ "async-io", "blocking", @@ -363,7 +363,7 @@ dependencies = [ "pin-project-lite 0.2.9", "rustls-native-certs 0.6.2", "tokio", - "tokio-rustls 0.23.3", + "tokio-rustls 0.23.4", "tungstenite", ] @@ -420,7 +420,7 @@ dependencies = [ "aws-types", "bytes", "hex", - "http 0.2.6", + "http 0.2.7", "hyper", "ring", "tokio", @@ -437,7 +437,7 @@ checksum = "5279590d48e92b287f864e099c7e851af03a5e184a57cec0959872cee297c7a0" dependencies = [ "aws-smithy-http", "aws-types", - "http 0.2.6", + "http 0.2.7", "regex", "tracing", ] @@ -451,7 +451,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "http 0.2.6", + "http 0.2.7", "lazy_static", "percent-encoding", "tracing", @@ -475,7 +475,7 @@ dependencies = [ "aws-types", "bytes", "fastrand", - "http 0.2.6", + "http 0.2.7", "tokio-stream", "tower", ] @@ -497,7 +497,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 0.2.6", + "http 0.2.7", "tokio-stream", "tower", ] @@ -520,7 +520,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http 0.2.6", + "http 0.2.7", "tower", ] @@ -533,7 +533,7 @@ dependencies = [ "aws-sigv4", "aws-smithy-http", "aws-types", - "http 0.2.6", + "http 0.2.7", "thiserror", "tracing", ] @@ -547,7 +547,7 @@ dependencies = [ "aws-smithy-http", "form_urlencoded", "hex", - "http 0.2.6", + "http 0.2.7", "once_cell", "percent-encoding", "regex", @@ -580,7 +580,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "http 0.2.6", + "http 0.2.7", "http-body", "hyper", "hyper-rustls", @@ -602,14 +602,14 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http 0.2.6", + "http 0.2.7", "http-body", "hyper", "once_cell", "percent-encoding", "pin-project", "tokio", - "tokio-util 0.6.9", + "tokio-util 0.6.10", "tracing", ] @@ -621,7 +621,7 @@ checksum = "101a2e213acebe624cfb9bfc944de5e33c849e0df0f09c3d3aa3b54368dbe7af" dependencies = [ "aws-smithy-http", "bytes", - "http 0.2.6", + "http 0.2.7", "http-body", "pin-project", "tower", @@ -694,7 +694,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "miniz_oxide", - "object 0.28.3", + "object 0.28.4", "rustc-demangle", ] @@ -827,7 +827,7 @@ version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c408da54db4c50d4693f7e649c299bc9de9c23ead86249e5368830bb32a734b" dependencies = [ - "semver 1.0.7", + "semver 1.0.9", "serde", "toml", "url", @@ -860,11 +860,13 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ + "js-sys", "libc", "num-integer", "num-traits", "serde", "time 0.1.44", + "wasm-bindgen", "winapi", ] @@ -992,18 +994,18 @@ checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" [[package]] name = "cranelift-bforest" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38faa2a16616c8e78a18d37b4726b98bfd2de192f2fdc8a39ddf568a408a0f75" +checksum = "ed44413e7e2fe3260d0ed73e6956ab188b69c10ee92b892e401e0f4f6808c68b" dependencies = [ "cranelift-entity", ] [[package]] name = "cranelift-codegen" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26f192472a3ba23860afd07d2b0217dc628f21fcc72617aa1336d98e1671f33b" +checksum = "0b5d83f0f26bf213f971f45589d17e5b65e4861f9ed22392b0cbb6eaa5bd329c" dependencies = [ "cranelift-bforest", "cranelift-codegen-meta", @@ -1018,33 +1020,33 @@ dependencies = [ [[package]] name = "cranelift-codegen-meta" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32ddb89e9b89d3d9b36a5b7d7ea3261c98235a76ac95ba46826b8ec40b1a24" +checksum = "6800dc386177df6ecc5a32680607ed8ba1fa0d31a2a59c8c61fbf44826b8191d" dependencies = [ "cranelift-codegen-shared", ] [[package]] name = "cranelift-codegen-shared" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01fd0d9f288cc1b42d9333b7a776b17e278fc888c28e6a0f09b5573d45a150bc" +checksum = "c961f85070985ebc8fcdb81b838a5cf842294d1e6ed4852446161c7e246fd455" [[package]] name = "cranelift-entity" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3bfe172b83167604601faf9dc60453e0d0a93415b57a9c4d1a7ae6849185cf" +checksum = "2347b2b8d1d5429213668f2a8e36c85ee3c73984a2f6a79007e365d3e575e7ed" dependencies = [ "serde", ] [[package]] name = "cranelift-frontend" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a006e3e32d80ce0e4ba7f1f9ddf66066d052a8c884a110b91d05404d6ce26dce" +checksum = "4cbcdbf7bed29e363568b778649b69dabc3d727256d5d25236096ef693757654" dependencies = [ "cranelift-codegen", "log", @@ -1054,9 +1056,9 @@ dependencies = [ [[package]] name = "cranelift-native" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501241b0cdf903412ec9075385ac9f2b1eb18a89044d1538e97fab603231f70c" +checksum = "8f4cdf93552e5ceb2e3c042829ebb4de4378492705f769eadc6a7c6c5251624c" dependencies = [ "cranelift-codegen", "libc", @@ -1065,9 +1067,9 @@ dependencies = [ [[package]] name = "cranelift-wasm" -version = "0.82.3" +version = "0.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d9e4211bbc3268042a96dd4de5bd979cda22434991d035f5f8eacba987fad2" +checksum = "d8b859d8cb1806f9ad0f59fdc25cbff576345abfad84c1aba483dd2f8e580e5c" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -1079,6 +1081,21 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32c" version = "0.6.3" @@ -1235,16 +1252,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" -dependencies = [ - "cfg-if 1.0.0", - "num_cpus", -] - [[package]] name = "derive_builder" version = "0.10.2" @@ -1462,9 +1469,9 @@ dependencies = [ [[package]] name = "femme" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2997b612abb06bc299486c807e68c5fd12e7618e69cf34c5958ca6b575674403" +checksum = "cc04871e5ae3aa2952d552dae6b291b3099723bf779a8054281c1366a54613ef" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -1512,9 +1519,9 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.12.9" +version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8a70358eb95b4647d41a55d8b1e4247a517b5ac8b61e1a989796bf97d371a79" +checksum = "33255695d2f7020fb0a9fa8642ed3bd3eb97c3b789d2ecdbae9d2f84eb3d512c" dependencies = [ "async-channel", "async-lock", @@ -1528,7 +1535,7 @@ dependencies = [ "dirs", "event-listener", "fluvio-compression", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-future", "fluvio-protocol", "fluvio-sc-schema", @@ -1537,10 +1544,9 @@ dependencies = [ "fluvio-spu-schema", "fluvio-types", "futures-util", - "instant", "once_cell", "pin-project-lite 0.2.9", - "semver 1.0.7", + "semver 1.0.9", "serde", "serde_json", "siphasher", @@ -1581,13 +1587,13 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.15.2" +version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "121616b9fbcd70305fd4f933914aa62f13aa9221d01cae11819e8dafd1f3dd65" +checksum = "8d876315d312d61a7822df10f60363a5ef396b40ea29b8ad4ab7ecfded7122e1" dependencies = [ "async-trait", "base64 0.13.0", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-future", "fluvio-protocol", "fluvio-stream-model", @@ -1614,16 +1620,40 @@ dependencies = [ "flv-util", "futures-util", "once_cell", - "semver 1.0.7", + "semver 1.0.9", + "thiserror", + "tracing", +] + +[[package]] +name = "fluvio-dataplane-protocol" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fbea89b47f14d209504903a86788c7013907a69167de56e35caa80e131aef0f" +dependencies = [ + "bytes", + "cfg-if 1.0.0", + "chrono", + "content_inspector", + "crc32c", + "eyre", + "fluvio-compression", + "fluvio-future", + "fluvio-protocol", + "fluvio-types", + "flv-util", + "futures-util", + "once_cell", + "semver 1.0.9", "thiserror", "tracing", ] [[package]] name = "fluvio-future" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4391dfc45967422482f128c2e3c44f0aded28ac64f3d9cf50a2a476ef37fee0" +checksum = "cc6a89ad97db849764e15c37fa98146599d97e1e5c10460ec3b73d67277b8de9" dependencies = [ "async-io", "async-net", @@ -1635,7 +1665,7 @@ dependencies = [ "futures-lite", "futures-util", "log", - "nix", + "nix 0.23.1", "openssl", "openssl-sys", "pin-project", @@ -1654,22 +1684,22 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "affa551e5cb1ef263ecaa47cdf36ba013872574fa3de1ff8660dd3ddcde89810" +checksum = "c875b24392b835610a0e8c620d11e696014bd106f48a9759bbd080bd1f25d4dd" dependencies = [ "bytes", "fluvio-future", "fluvio-protocol-derive", - "tokio-util 0.7.1", + "tokio-util 0.7.2", "tracing", ] [[package]] name = "fluvio-protocol-derive" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe2b322665711776c5edbe78187389f72e5cf183598a1bfc59723cab97156659" +checksum = "37e1b0457e0fd958a6a8f54a508fda58ba169ba667d3fe26367fa50eee4d733b" dependencies = [ "proc-macro2", "quote", @@ -1679,12 +1709,12 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" -version = "0.13.0" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "492d00f9e00e2164a6b973cd1161dcbfc898c68a8c3c4bedfdbc369c4f14ff90" +checksum = "a9f16db4cf0036c3c5890e3e63c48c460c5a7b59c73440fba6696d4b8bed6e1d" dependencies = [ "fluvio-controlplane-metadata", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-protocol", "fluvio-types", "log", @@ -1696,28 +1726,28 @@ dependencies = [ [[package]] name = "fluvio-smartengine" -version = "0.2.7" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b261fbe62efcbb5aa1eeba6ce6c090234f1ae003fe91626ef375cd61e4956d7" +checksum = "09dc4eb495c5ef4f6efc6dd5ac9c87ff8db448f5bc69914f644111097babd5da" dependencies = [ "anyhow", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-future", "fluvio-spu-schema", "futures-util", - "nix", + "nix 0.24.1", "tracing", "wasmtime", ] [[package]] name = "fluvio-smartmodule" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42aa97596de4c7acac41d72f1b5879af627593e21112bf6e0597cfd276fee13d" +checksum = "1ef31561078a389034e04bbeae58cd4cac64282bd7e75c8195422fba4fe68b53" dependencies = [ "eyre", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-smartmodule-derive", ] @@ -1751,19 +1781,19 @@ dependencies = [ "pin-project", "thiserror", "tokio", - "tokio-util 0.7.1", + "tokio-util 0.7.2", "tracing", ] [[package]] name = "fluvio-spu-schema" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d272bb19941dc300c76b65a7952fc70dd7b277f1f98651f3c47dceb7f2bc6c81" +checksum = "a300a9e13672016511b71d3046c061fcfae2f66b48d5540decba7965b4975d4c" dependencies = [ "bytes", "flate2", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.11.1", "fluvio-protocol", "log", "serde", @@ -1773,9 +1803,9 @@ dependencies = [ [[package]] name = "fluvio-stream-model" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb74401ad8394d4c5f54efbe2864428ade43b9db3cffe76c8315705050c186f" +checksum = "4e421ff1be3b83020a3a97cb7ef8b6f045bca3b1eea2c95f0cea791eeeaaf988" dependencies = [ "async-rwlock", "event-listener", @@ -1812,9 +1842,9 @@ dependencies = [ [[package]] name = "fluvio-types" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01de48bb1ae838241bca54fef31c2e8c9c891e8600bfe9754eaaeff0a0d1e9a4" +checksum = "34e8812836b82cdeeee3448cfac91ad6957b25e0e92826cbd1a51a240cfef1a9" dependencies = [ "event-listener", "thiserror", @@ -2088,11 +2118,11 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.6", + "http 0.2.7", "indexmap", "slab", "tokio", - "tokio-util 0.7.1", + "tokio-util 0.7.2", "tracing", ] @@ -2170,7 +2200,7 @@ name = "http" version = "0.2.1" dependencies = [ "fluvio-connectors-common", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.10.1", "fluvio-future", "reqwest", "rstest", @@ -2187,9 +2217,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes", "fnv", @@ -2203,19 +2233,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" dependencies = [ "bytes", - "http 0.2.6", + "http 0.2.7", "pin-project-lite 0.2.9", ] [[package]] name = "http-client" -version = "6.5.1" +version = "6.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea880b03c18a7e981d7fb3608b8904a98425d53c440758fcebf7d934aa56547c" +checksum = "e023af341b797ce2c039f7c6e1d347b68d0f7fd0bc7ac234fe69cfadcca1f89a" dependencies = [ "async-trait", "cfg-if 1.0.0", - "dashmap", "http-types", "log", ] @@ -2280,7 +2309,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 0.2.6", + "http 0.2.7", "http-body", "httparse", "httpdate", @@ -2418,9 +2447,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "ittapi-rs" @@ -2449,11 +2478,62 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kafka" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b11c86b0c0c9a9d89b136b2938a5b46a35c40f66eced2f09c76458b17dadfc2a" +dependencies = [ + "byteorder", + "crc", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror", + "tracing", + "twox-hash", +] + +[[package]] +name = "kafka-sink" +version = "0.1.1" +dependencies = [ + "anyhow", + "fluvio-connectors-common", + "fluvio-future", + "kafka", + "schemars", + "serde", + "serde_json", + "structopt", + "tokio", + "tokio-stream", +] + +[[package]] +name = "kafka-source" +version = "0.1.1" +dependencies = [ + "anyhow", + "fluvio-connectors-common", + "fluvio-future", + "kafka", + "schemars", + "serde", + "serde_json", + "structopt", + "tokio", + "tokio-stream", +] + [[package]] name = "kqueue" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97caf428b83f7c86809b7450722cd1f2b1fc7fb23aa7b9dee7e72ed14d048352" +checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" dependencies = [ "kqueue-sys", "libc", @@ -2492,9 +2572,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.124" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "linked-hash-map" @@ -2520,9 +2600,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", "serde", @@ -2573,9 +2653,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memfd" @@ -2618,25 +2698,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" dependencies = [ "libc", "log", - "miow", - "ntapi", "wasi 0.11.0+wasi-snapshot-preview1", - "winapi", -] - -[[package]] -name = "miow" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" -dependencies = [ - "winapi", + "windows-sys", ] [[package]] @@ -2654,7 +2723,7 @@ dependencies = [ "fluvio-connectors-common", "fluvio-future", "rumqttc", - "rustls 0.20.4", + "rustls 0.20.6", "rustls-native-certs 0.6.2", "schemars", "serde", @@ -2697,6 +2766,18 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nix" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f17df307904acd05aa8e32e97bb20f2a0df1728bbc2d771ae8f9a90463441e9" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "7.1.1" @@ -2709,9 +2790,9 @@ dependencies = [ [[package]] name = "notify" -version = "5.0.0-pre.14" +version = "5.0.0-pre.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d13c22db70a63592e098fb51735bab36646821e6389a0ba171f3549facdf0b74" +checksum = "553f9844ad0b0824605c20fb55a661679782680410abfb1a8144c2e7e437e7a7" dependencies = [ "bitflags", "crossbeam-channel", @@ -2725,20 +2806,11 @@ dependencies = [ "winapi", ] -[[package]] -name = "ntapi" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" -dependencies = [ - "winapi", -] - [[package]] name = "num-integer" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ "autocfg", "num-traits", @@ -2746,9 +2818,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", ] @@ -2765,9 +2837,9 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" dependencies = [ "libc", ] @@ -2785,18 +2857,18 @@ dependencies = [ [[package]] name = "object" -version = "0.28.3" +version = "0.28.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456" +checksum = "e42c982f2d955fac81dd7e1d0e1426a7d702acd9c98d19ab01083a6a0328c424" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +checksum = "7b10983b38c53aebdf33f542c6275b0f58a238129d00c4ae0e6fb59738d783ca" [[package]] name = "opaque-debug" @@ -2806,18 +2878,30 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.38" +version = "0.10.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" dependencies = [ "bitflags", "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", + "openssl-macros", "openssl-sys", ] +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -2826,18 +2910,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.18.0+1.1.1n" +version = "111.20.0+1.1.1o" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7897a926e1e8d00219127dc020130eca4292e5ca666dd592480d72c3eca2ff6c" +checksum = "92892c4f87d56e376e469ace79f1128fdaded07646ddf73aa0be4706ff712dec" dependencies = [ "cc", ] [[package]] name = "openssl-sys" -version = "0.9.72" +version = "0.9.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" dependencies = [ "autocfg", "cc", @@ -2871,7 +2955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core 0.9.2", + "parking_lot_core 0.9.3", ] [[package]] @@ -2890,9 +2974,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "cfg-if 1.0.0", "libc", @@ -3034,9 +3118,9 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ec03bce71f18b4a27c4c64c6ba2ddf74686d69b91d8714fb32ead3adaed713" +checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c" dependencies = [ "base64 0.13.0", "byteorder", @@ -3065,7 +3149,7 @@ dependencies = [ "once_cell", "postgres-protocol 0.6.1", "postgres-source", - "postgres-types 0.2.2", + "postgres-types 0.2.3", "schemars", "serde", "serde_json", @@ -3119,13 +3203,13 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d" +checksum = "ebd6e8b7189a73169290e89bd24c771071f1012d8fe6f738f5226531f0b03d89" dependencies = [ "bytes", "fallible-iterator", - "postgres-protocol 0.6.3", + "postgres-protocol 0.6.4", ] [[package]] @@ -3166,11 +3250,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.37" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" +checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -3264,9 +3348,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" dependencies = [ "autocfg", "crossbeam-deque", @@ -3276,9 +3360,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.2" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -3306,6 +3390,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + [[package]] name = "regalloc" version = "0.0.34" @@ -3319,9 +3409,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.5" +version = "1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" dependencies = [ "aho-corasick", "memchr", @@ -3339,9 +3429,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.25" +version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" [[package]] name = "region" @@ -3376,7 +3466,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 0.2.6", + "http 0.2.7", "http-body", "hyper", "hyper-tls", @@ -3467,13 +3557,13 @@ dependencies = [ "async-channel", "async-tungstenite", "bytes", - "http 0.2.6", + "http 0.2.7", "log", "pollster", "rustls-pemfile 0.3.0", "thiserror", "tokio", - "tokio-rustls 0.23.3", + "tokio-rustls 0.23.4", "url", "ws_stream_tungstenite", ] @@ -3505,7 +3595,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.7", + "semver 1.0.9", ] [[package]] @@ -3537,9 +3627,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.4" +version = "0.20.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" dependencies = [ "log", "ring", @@ -3591,9 +3681,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" [[package]] name = "same-file" @@ -3606,19 +3696,19 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "winapi", + "windows-sys", ] [[package]] name = "schemars" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6b5a3c80cea1ab61f4260238409510e814e38b4b563c06044edf91e7dc070e3" +checksum = "1847b767a3d62d95cbf3d8a9f0e421cf57a0d8aa4f411d4b16525afb0284d4ed" dependencies = [ "dyn-clone", "schemars_derive", @@ -3629,9 +3719,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41ae4dce13e8614c46ac3c38ef1c0d668b101df6ac39817aebdaa26642ddae9b" +checksum = "af4d7e1b012cb3d9129567661a63755ea4b8a7386d339dc945ae187e403c6743" dependencies = [ "proc-macro2", "quote", @@ -3699,9 +3789,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" +checksum = "8cb243bdfdb5936c8dc3c45762a19d12ab4550cdc753bc247637d4ec35a040fd" dependencies = [ "serde", ] @@ -3720,18 +3810,18 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" [[package]] name = "serde" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", @@ -3740,9 +3830,9 @@ dependencies = [ [[package]] name = "serde_derive_internals" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dbab34ca63057a1f15280bdf3c39f2b1eb1b54c17e98360e511637aef7418c6" +checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" dependencies = [ "proc-macro2", "quote", @@ -3760,9 +3850,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ "itoa", "ryu", @@ -3794,9 +3884,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.23" +version = "0.8.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a521f2940385c165a24ee286aa8599633d162077a54bdcae2a6fd5a7bfa7a0" +checksum = "707d15895415db6628332b737c838b88c598522e4dc70647e59b72312924aebc" dependencies = [ "indexmap", "ryu", @@ -3867,9 +3957,9 @@ dependencies = [ [[package]] name = "signal-hook" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" dependencies = [ "libc", "signal-hook-registry", @@ -4091,13 +4181,13 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.91" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -4146,7 +4236,7 @@ dependencies = [ "anyhow", "async-std", "fluvio-connectors-common", - "fluvio-dataplane-protocol", + "fluvio-dataplane-protocol 0.10.1", "schemars", "serde", "serde_json", @@ -4164,18 +4254,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", @@ -4290,9 +4380,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.0" +version = "1.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" +checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" dependencies = [ "bytes", "libc", @@ -4348,7 +4438,7 @@ dependencies = [ "postgres-types 0.2.1", "socket2", "tokio", - "tokio-util 0.6.9", + "tokio-util 0.6.10", ] [[package]] @@ -4364,11 +4454,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.3" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.4", + "rustls 0.20.6", "tokio", "webpki 0.22.0", ] @@ -4386,9 +4476,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ "bytes", "futures-core", @@ -4400,9 +4490,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" dependencies = [ "bytes", "futures-core", @@ -4528,11 +4618,11 @@ dependencies = [ "base64 0.13.0", "byteorder", "bytes", - "http 0.2.6", + "http 0.2.7", "httparse", "log", "rand 0.8.5", - "rustls 0.20.4", + "rustls 0.20.6", "sha-1", "thiserror", "url", @@ -4542,11 +4632,12 @@ dependencies = [ [[package]] name = "twox-hash" -version = "1.6.2" +version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if 1.0.0", + "rand 0.8.5", "static_assertions", ] @@ -4562,6 +4653,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +[[package]] +name = "unicode-ident" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" + [[package]] name = "unicode-normalization" version = "0.1.19" @@ -4583,12 +4680,6 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" -[[package]] -name = "unicode-xid" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" - [[package]] name = "universal-hash" version = "0.4.1" @@ -4648,9 +4739,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-bag" -version = "1.0.0-alpha.8" +version = "1.0.0-alpha.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" dependencies = [ "ctor", "erased-serde", @@ -4799,9 +4890,9 @@ checksum = "718ed7c55c2add6548cca3ddd6383d738cd73b892df400e96b9aa876f0141d7a" [[package]] name = "wasmtime" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21ffb4705016d5ca91e18a72ed6822dab50e6d5ddd7045461b17ef19071cdef1" +checksum = "4beb256f8514dd3eb85b03bceb422abfff5f07bf564519cbe0abf80462120bc0" dependencies = [ "anyhow", "async-trait", @@ -4833,9 +4924,9 @@ dependencies = [ [[package]] name = "wasmtime-cache" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c6ab24291fa7cb3a181f5669f6c72599b7ef781669759b45c7828c5999d0c0" +checksum = "7a0a7bd250adf9d4ee6eed952058aa5ee102ab25e0f6ed1f9ab1212db5cb4ac9" dependencies = [ "anyhow", "base64 0.13.0", @@ -4853,9 +4944,9 @@ dependencies = [ [[package]] name = "wasmtime-cranelift" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04c810078a491b7bc4866ebe045f714d2b95e6b539e1f64009a4a7606be11de" +checksum = "0e8f126b9c55e9550391617049f468878cb9f4af75f33009be72f0c692e1d2ed" dependencies = [ "anyhow", "cranelift-codegen", @@ -4875,9 +4966,9 @@ dependencies = [ [[package]] name = "wasmtime-environ" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61448266ea164b1ac406363cdcfac81c7c44db4d94c7a81c8620ac6c5c6cdf59" +checksum = "343cfc7e7604b063ef0429e043a06e077a39bcb1ce3e731abe0414431f2610de" dependencies = [ "anyhow", "cranelift-entity", @@ -4895,9 +4986,9 @@ dependencies = [ [[package]] name = "wasmtime-fiber" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaaa38c3b48822ab27044e1d4a25a1052157de4c8f27574cb00167e127e320f" +checksum = "40c5449d5dc7908e284c957754e355465c3588aee31f0c547a5f3cef35c16b62" dependencies = [ "cc", "rustix", @@ -4906,9 +4997,9 @@ dependencies = [ [[package]] name = "wasmtime-jit" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "156b4623c6b0d4b8c24afb846c20525922f538ef464cc024abab7ea8de2109a2" +checksum = "81f3a31bf04f02ec7497ddeda20e14b640b1e4d9e1c55c1ebdb268bf30800e2d" dependencies = [ "addr2line", "anyhow", @@ -4933,9 +5024,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-debug" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5dc31f811760a6c76b2672c404866fd19b75e5fb3b0075a3e377a6846490654" +checksum = "4e725c3a37e49bd95d2d350c9f1b6641ac6e75bc357204e857ac058d0c64b720" dependencies = [ "lazy_static", "object 0.27.1", @@ -4944,9 +5035,9 @@ dependencies = [ [[package]] name = "wasmtime-runtime" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f907beaff69d4d920fa4688411ee4cc75c0f01859e424677f9e426e2ef749864" +checksum = "2ed30d50ffd763873f49df0c94efa20c17749f375518117d6677cc29b631a1fc" dependencies = [ "anyhow", "backtrace", @@ -4971,9 +5062,9 @@ dependencies = [ [[package]] name = "wasmtime-types" -version = "0.35.3" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514ef0e5fd197b9609dc9eb74beba0c84d5a12b2417cbae55534633329ba4852" +checksum = "72080d5bdd54af03de7f5ca8b67dcd8ea36f4dfec75fc89a976ebc847ee1516e" dependencies = [ "cranelift-entity", "serde", @@ -4983,9 +5074,9 @@ dependencies = [ [[package]] name = "wast" -version = "40.0.0" +version = "41.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb4f48a8b083dbc50e291e430afb8f524092bb00428957bcc63f49f856c64ac" +checksum = "f882898b8b817cc4edc16aa3692fdc087b356edc8cc0c2164f5b5181e31c3870" dependencies = [ "leb128", "memchr", @@ -4994,9 +5085,9 @@ dependencies = [ [[package]] name = "wat" -version = "1.0.42" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0401b6395ce0db91629a75b29597ccb66ea29950af9fc859f1bb3a736609c76e" +checksum = "48b3b9b3e39e66c7fd3f8be785e74444d216260f491e93369e317ed6482ff80f" dependencies = [ "wast", ] @@ -5073,9 +5164,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ "windows_aarch64_msvc", "windows_i686_gnu", @@ -5086,33 +5177,33 @@ dependencies = [ [[package]] name = "windows_aarch64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" [[package]] name = "windows_i686_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" [[package]] name = "windows_i686_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" [[package]] name = "windows_x86_64_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" [[package]] name = "windows_x86_64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" [[package]] name = "winreg" @@ -5178,24 +5269,24 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb5728b8afd3f280a869ce1d4c554ffaed35f45c231fc41bfbd0381bef50317" +checksum = "94693807d016b2f2d2e14420eb3bfcca689311ff775dcf113d74ea624b7cdf07" [[package]] name = "zstd" -version = "0.10.0+zstd.1.5.2" +version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.4+zstd.1.5.2" +version = "5.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" dependencies = [ "libc", "zstd-sys", @@ -5203,9 +5294,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.3+zstd.1.5.2" +version = "2.0.1+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" dependencies = [ "cc", "libc", diff --git a/Cargo.toml b/Cargo.toml index 30346c5c..e13698f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ members = [ "rust-connectors/sources/mqtt", "rust-connectors/sources/http", "rust-connectors/sources/postgres", + "rust-connectors/sources/kafka", "rust-connectors/sinks/postgres", "rust-connectors/sinks/slack", "rust-connectors/sinks/dynamodb/", + "rust-connectors/sinks/kafka", "rust-connectors/models/fluvio-model-postgres", "rust-connectors/utils/mocks/http-json-mock/", "rust-connectors/utils/test-connector", diff --git a/rust-connectors/common/src/opt.rs b/rust-connectors/common/src/opt.rs index 83c5dfc9..a8a09bf1 100644 --- a/rust-connectors/common/src/opt.rs +++ b/rust-connectors/common/src/opt.rs @@ -22,6 +22,10 @@ pub struct CommonSourceOpt { #[schemars(skip)] pub fluvio_topic: String, + #[structopt(long, default_value = "0")] + #[schemars(skip)] + pub fluvio_partition: i32, + /// The rust log level. If it is not defined, `RUST_LOG` environment variable /// will be used. If environment variable is not defined, /// then INFO level will be used. @@ -154,7 +158,7 @@ impl CommonSourceOpt { if let Some(initial_value) = &self.aggregate_initial_value { if initial_value == "use-last" { let consumer = fluvio - .partition_consumer(self.fluvio_topic.clone(), 0) + .partition_consumer(self.fluvio_topic.clone(), self.fluvio_partition) .await?; let stream = consumer.stream(fluvio::Offset::from_end(1)).await?; let timeout = stream.timeout(Duration::from_millis(3000)); @@ -203,14 +207,13 @@ impl CommonSourceOpt { } } - async fn create_consumer(&self, partition: i32) -> anyhow::Result { + async fn create_consumer(&self) -> anyhow::Result { self.ensure_topic_exists().await?; - Ok(fluvio::consumer(&self.fluvio_topic, partition).await?) + Ok(fluvio::consumer(&self.fluvio_topic, self.fluvio_partition).await?) } pub async fn create_consumer_stream( &self, - partition: i32, ) -> anyhow::Result>> { let fluvio = fluvio::Fluvio::connect().await?; let wasm_invocation: Option = @@ -252,7 +255,7 @@ impl CommonSourceOpt { let mut builder = ConsumerConfig::builder(); builder.smartmodule(wasm_invocation); let config = builder.build()?; - let consumer = self.create_consumer(partition).await?; + let consumer = self.create_consumer().await?; let offset = fluvio::Offset::end(); Ok(consumer.stream_with_config(offset, config).await?) } diff --git a/rust-connectors/sinks/dynamodb/src/main.rs b/rust-connectors/sinks/dynamodb/src/main.rs index 5d164166..8d08f368 100644 --- a/rust-connectors/sinks/dynamodb/src/main.rs +++ b/rust-connectors/sinks/dynamodb/src/main.rs @@ -64,7 +64,7 @@ impl DynamoDbOpt { let client = Client::from_conf(dynamodb_local_config); let _ = self.create_table(&client).await?; - let mut stream = self.common.create_consumer_stream(0).await?; + let mut stream = self.common.create_consumer_stream().await?; info!("Starting stream"); while let Some(Ok(record)) = stream.next().await { if let Err(e) = self.send_to_dynamodb(&record, &client).await { diff --git a/rust-connectors/sinks/kafka/CHANGELOG.md b/rust-connectors/sinks/kafka/CHANGELOG.md new file mode 100644 index 00000000..d5199577 --- /dev/null +++ b/rust-connectors/sinks/kafka/CHANGELOG.md @@ -0,0 +1,3 @@ +# Slack Connector Change Log + +## http Version 0.1.0 - UNRELEASED diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml new file mode 100644 index 00000000..3a151c7f --- /dev/null +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "kafka-sink" +version = "0.1.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fluvio-connectors-common = { path = "../../common" } +anyhow = "1.0.56" +schemars = { version = "0.8", features = ["url"] } +structopt = "0.3" +tokio = { version = "1", features = ["full"] } +serde_json = "1" + +serde = "1" +tokio-stream = "0.1" +fluvio-future = { version = "0.3.15", features = ["subscriber"] } + +kafka = "0.9.0" diff --git a/rust-connectors/sinks/kafka/README.md b/rust-connectors/sinks/kafka/README.md new file mode 100644 index 00000000..44430a52 --- /dev/null +++ b/rust-connectors/sinks/kafka/README.md @@ -0,0 +1,10 @@ +# Fluvio Slack Connector + +## Slack Sink Connector + +Controls the Source connector + +| Option | default | type | description | +| :--- | :--- | :--- | :---- | +| WEBHOOK_URL | - | String | The secret key for the slack webhook url | +| webhook-url | - | String | The paramaters key for the slack webhook url| diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs new file mode 100644 index 00000000..9a6c35d8 --- /dev/null +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -0,0 +1,70 @@ +use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; +use fluvio_future::tracing::{debug, info}; +use schemars::schema_for; +use schemars::JsonSchema; +use structopt::StructOpt; +use tokio_stream::StreamExt; + +use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + if let Some("metadata") = std::env::args().nth(1).as_deref() { + let schema = serde_json::json!({ + "name": env!("CARGO_PKG_NAME"), + "version": env!("CARGO_PKG_VERSION"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "direction": "Source", + "schema": schema_for!(KafkaOpt), + }); + println!("{}", serde_json::to_string_pretty(&schema).unwrap()); + return Ok(()); + } + let opts: KafkaOpt = KafkaOpt::from_args(); + opts.common.enable_logging(); + let _ = opts.execute().await?; + Ok(()) +} + +#[derive(StructOpt, Debug, JsonSchema, Clone)] +pub struct KafkaOpt { + #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] + pub kafka_url: String, + + #[structopt(flatten)] + #[schemars(flatten)] + pub common: CommonSourceOpt, +} + +impl KafkaOpt { + pub async fn execute(&self) -> anyhow::Result<()> { + let mut stream = self.common.create_consumer_stream().await?; + let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); + let _ = kafka_client.load_metadata_all()?; + + info!("Starting stream"); + while let Some(Ok(record)) = stream.next().await { + let _ = self.send_to_kafka(&record, &mut kafka_client).await; + } + Ok(()) + } + pub async fn send_to_kafka( + &self, + record: &Record, + kafka_client: &mut KafkaClient, + ) -> anyhow::Result<()> { + let text = String::from_utf8_lossy(record.value()); + debug!("Sending {:?}, to kafka", text); + let msg = ProduceMessage::new( + self.common.fluvio_topic.as_str(), + 0, + record.key(), + Some(record.value()), + ); + let req = vec![msg]; + let _resp = + kafka_client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req)?; + Ok(()) + } +} diff --git a/rust-connectors/sinks/slack/src/main.rs b/rust-connectors/sinks/slack/src/main.rs index 77159bb5..f35bbf63 100644 --- a/rust-connectors/sinks/slack/src/main.rs +++ b/rust-connectors/sinks/slack/src/main.rs @@ -37,7 +37,7 @@ pub struct SlackOpt { impl SlackOpt { pub async fn execute(&self) -> anyhow::Result<()> { - let mut stream = self.common.create_consumer_stream(0).await?; + let mut stream = self.common.create_consumer_stream().await?; info!("Starting stream"); while let Some(Ok(record)) = stream.next().await { let _ = self.send_to_slack(&record).await; From a44ada4b5d4fb1485348f21b4d9f50e010eac71e Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Fri, 20 May 2022 16:25:56 -0400 Subject: [PATCH 02/18] Add kafka source to git tree --- rust-connectors/sources/kafka/CHANGELOG.md | 3 + rust-connectors/sources/kafka/Cargo.toml | 20 +++++++ rust-connectors/sources/kafka/README.md | 10 ++++ rust-connectors/sources/kafka/src/main.rs | 69 ++++++++++++++++++++++ 4 files changed, 102 insertions(+) create mode 100644 rust-connectors/sources/kafka/CHANGELOG.md create mode 100644 rust-connectors/sources/kafka/Cargo.toml create mode 100644 rust-connectors/sources/kafka/README.md create mode 100644 rust-connectors/sources/kafka/src/main.rs diff --git a/rust-connectors/sources/kafka/CHANGELOG.md b/rust-connectors/sources/kafka/CHANGELOG.md new file mode 100644 index 00000000..d5199577 --- /dev/null +++ b/rust-connectors/sources/kafka/CHANGELOG.md @@ -0,0 +1,3 @@ +# Slack Connector Change Log + +## http Version 0.1.0 - UNRELEASED diff --git a/rust-connectors/sources/kafka/Cargo.toml b/rust-connectors/sources/kafka/Cargo.toml new file mode 100644 index 00000000..4c950a53 --- /dev/null +++ b/rust-connectors/sources/kafka/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "kafka-source" +version = "0.1.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fluvio-connectors-common = { path = "../../common" } +anyhow = "1.0.56" +schemars = { version = "0.8", features = ["url"] } +structopt = "0.3" +tokio = { version = "1", features = ["full"] } +serde_json = "1" + +serde = "1" +tokio-stream = "0.1" +fluvio-future = { version = "0.3.15", features = ["subscriber"] } + +kafka = "0.9.0" diff --git a/rust-connectors/sources/kafka/README.md b/rust-connectors/sources/kafka/README.md new file mode 100644 index 00000000..44430a52 --- /dev/null +++ b/rust-connectors/sources/kafka/README.md @@ -0,0 +1,10 @@ +# Fluvio Slack Connector + +## Slack Sink Connector + +Controls the Source connector + +| Option | default | type | description | +| :--- | :--- | :--- | :---- | +| WEBHOOK_URL | - | String | The secret key for the slack webhook url | +| webhook-url | - | String | The paramaters key for the slack webhook url| diff --git a/rust-connectors/sources/kafka/src/main.rs b/rust-connectors/sources/kafka/src/main.rs new file mode 100644 index 00000000..82aa30c0 --- /dev/null +++ b/rust-connectors/sources/kafka/src/main.rs @@ -0,0 +1,69 @@ +use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; +use fluvio_future::tracing::{debug, info}; +use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; +use schemars::schema_for; +use schemars::JsonSchema; +use std::time::Duration; +use structopt::StructOpt; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + if let Some("metadata") = std::env::args().nth(1).as_deref() { + let schema = serde_json::json!({ + "name": env!("CARGO_PKG_NAME"), + "version": env!("CARGO_PKG_VERSION"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "direction": "Source", + "schema": schema_for!(KafkaOpt), + }); + println!("{}", serde_json::to_string_pretty(&schema).unwrap()); + return Ok(()); + } + let opts: KafkaOpt = KafkaOpt::from_args(); + opts.common.enable_logging(); + let _ = opts.execute().await?; + Ok(()) +} + +#[derive(StructOpt, Debug, JsonSchema, Clone)] +pub struct KafkaOpt { + #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] + pub kafka_url: String, + + #[structopt(flatten)] + #[schemars(flatten)] + pub common: CommonSourceOpt, +} + +impl KafkaOpt { + pub async fn execute(&self) -> anyhow::Result<()> { + let mut stream = self.common.create_consumer_stream().await?; + let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); + let _ = kafka_client.load_metadata_all()?; + + info!("Starting stream"); + while let Some(Ok(record)) = stream.next().await { + let _ = self.send_to_kafka(&record, &mut kafka_client).await; + } + Ok(()) + } + pub async fn send_to_kafka( + &self, + record: &Record, + kafka_client: &mut KafkaClient, + ) -> anyhow::Result<()> { + let text = String::from_utf8_lossy(record.value()); + debug!("Sending {:?}, to kafka", text); + let msg = ProduceMessage::new( + self.common.fluvio_topic.as_str(), + 0, + record.key(), + Some(record.value()), + ); + let req = vec![msg]; + let _resp = + kafka_client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req); + Ok(()) + } +} From b31a907ed9ec7ec1d228744a4cdcb3a644370042 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Fri, 20 May 2022 17:04:23 -0400 Subject: [PATCH 03/18] Use correct partition for kafka sink --- rust-connectors/sinks/kafka/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 9a6c35d8..5191ac3f 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -58,7 +58,7 @@ impl KafkaOpt { debug!("Sending {:?}, to kafka", text); let msg = ProduceMessage::new( self.common.fluvio_topic.as_str(), - 0, + self.common.fluvio_partition, record.key(), Some(record.value()), ); From aea00277108e37726fc615ed4ab0010dbbf75f37 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Thu, 2 Jun 2022 12:29:56 -0400 Subject: [PATCH 04/18] wip --- .github/workflows/ci.yml | 6 +++ rust-connectors/sinks/kafka/src/main.rs | 9 +++- rust-connectors/sources/kafka/src/main.rs | 65 +++++++++++++---------- 3 files changed, 50 insertions(+), 30 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a501db41..5640be15 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,6 +83,8 @@ jobs: - postgres-sink - slack-sink - dynamodb-sink + - kafka-source + - kafka-sink rust: [stable] env: RUST_BACKTRACE: full @@ -145,6 +147,8 @@ jobs: - postgres-sink - slack-sink - dynamodb-sink + - kafka-source + - kafka-sink env: RUSTV: ${{ matrix.rust }} TARGET: ${{ matrix.rust-target }} @@ -318,6 +322,8 @@ jobs: - postgres-sink - slack-sink - dynamodb-sink + - kafka-source + - kafka-sink steps: - name: Download ${{ matrix.connector-name }} x86_64-unknown-linux-musl Docker Image as Artifact uses: actions/download-artifact@v3 diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 5191ac3f..efe89ab8 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -32,6 +32,9 @@ pub struct KafkaOpt { #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] pub kafka_url: String, + #[structopt(long)] + pub kafka_topic: Option, + #[structopt(flatten)] #[schemars(flatten)] pub common: CommonSourceOpt, @@ -54,10 +57,14 @@ impl KafkaOpt { record: &Record, kafka_client: &mut KafkaClient, ) -> anyhow::Result<()> { + let kafka_topic = self + .kafka_topic + .as_ref() + .unwrap_or(&self.common.fluvio_topic); let text = String::from_utf8_lossy(record.value()); debug!("Sending {:?}, to kafka", text); let msg = ProduceMessage::new( - self.common.fluvio_topic.as_str(), + kafka_topic, self.common.fluvio_partition, record.key(), Some(record.value()), diff --git a/rust-connectors/sources/kafka/src/main.rs b/rust-connectors/sources/kafka/src/main.rs index 82aa30c0..9e6e2977 100644 --- a/rust-connectors/sources/kafka/src/main.rs +++ b/rust-connectors/sources/kafka/src/main.rs @@ -1,11 +1,9 @@ -use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; +use fluvio_connectors_common::opt::CommonSourceOpt; use fluvio_future::tracing::{debug, info}; -use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; +use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage}; use schemars::schema_for; use schemars::JsonSchema; -use std::time::Duration; use structopt::StructOpt; -use tokio_stream::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -31,6 +29,12 @@ pub struct KafkaOpt { #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] pub kafka_url: String, + #[structopt(long)] + pub kafka_group: Option, + + #[structopt(long)] + pub kafka_topic: Option, + #[structopt(flatten)] #[schemars(flatten)] pub common: CommonSourceOpt, @@ -38,32 +42,35 @@ pub struct KafkaOpt { impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { - let mut stream = self.common.create_consumer_stream().await?; - let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); - let _ = kafka_client.load_metadata_all()?; + let producer = self.common.create_producer().await?; + info!("Connected to fluvio!"); + let kafka_topic = self + .kafka_topic + .as_ref() + .unwrap_or(&self.common.fluvio_topic); + + let mut consumer = Consumer::from_hosts(vec![self.kafka_url.clone()]) + .with_topic_partitions(kafka_topic.clone(), &[self.common.fluvio_partition]) + .with_fallback_offset(FetchOffset::Earliest) + .with_group( + self.kafka_group + .clone() + .unwrap_or_else(|| "fluvio-kafka-source".to_string()), + ) + .with_offset_storage(GroupOffsetStorage::Kafka) + .create()?; - info!("Starting stream"); - while let Some(Ok(record)) = stream.next().await { - let _ = self.send_to_kafka(&record, &mut kafka_client).await; + info!("Connected to kafka!"); + loop { + for ms in consumer.poll().unwrap().iter() { + for m in ms.messages() { + let _ = producer.send(m.key, m.value).await?; + + debug!("{:?}", m); + } + let _ = consumer.consume_messageset(ms)?; + } + consumer.commit_consumed().unwrap(); } - Ok(()) - } - pub async fn send_to_kafka( - &self, - record: &Record, - kafka_client: &mut KafkaClient, - ) -> anyhow::Result<()> { - let text = String::from_utf8_lossy(record.value()); - debug!("Sending {:?}, to kafka", text); - let msg = ProduceMessage::new( - self.common.fluvio_topic.as_str(), - 0, - record.key(), - Some(record.value()), - ); - let req = vec![msg]; - let _resp = - kafka_client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req); - Ok(()) } } From 483c2a07f8c6ac5bf4446cc79d6d2fe4939467a4 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Tue, 7 Jun 2022 17:06:29 -0400 Subject: [PATCH 05/18] Updates --- Cargo.lock | 143 +++++++++++----------- rust-connectors/sinks/kafka/src/main.rs | 8 +- rust-connectors/sources/kafka/src/main.rs | 2 +- 3 files changed, 76 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b19a80c..3d3c14f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,17 +164,16 @@ dependencies = [ [[package]] name = "async-global-executor" -version = "2.0.4" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c290043c9a95b05d45e952fb6383c67bcb61471f60cfa21e890dba6654234f43" +checksum = "fd8b508d585e01084059b60f06ade4cb7415cd2e4084b71dd1cb44e7d3fb9880" dependencies = [ "async-channel", "async-executor", "async-io", - "async-mutex", + "async-lock", "blocking", "futures-lite", - "num_cpus", "once_cell", "tokio", ] @@ -197,9 +196,9 @@ dependencies = [ [[package]] name = "async-io" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07" dependencies = [ "concurrent-queue", "futures-lite", @@ -342,9 +341,9 @@ checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-trait" -version = "0.1.53" +version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" dependencies = [ "proc-macro2", "quote", @@ -420,7 +419,7 @@ dependencies = [ "aws-types", "bytes", "hex", - "http 0.2.7", + "http 0.2.8", "hyper", "ring", "tokio", @@ -437,7 +436,7 @@ checksum = "5279590d48e92b287f864e099c7e851af03a5e184a57cec0959872cee297c7a0" dependencies = [ "aws-smithy-http", "aws-types", - "http 0.2.7", + "http 0.2.8", "regex", "tracing", ] @@ -451,7 +450,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "http 0.2.7", + "http 0.2.8", "lazy_static", "percent-encoding", "tracing", @@ -475,7 +474,7 @@ dependencies = [ "aws-types", "bytes", "fastrand", - "http 0.2.7", + "http 0.2.8", "tokio-stream", "tower", ] @@ -497,7 +496,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 0.2.7", + "http 0.2.8", "tokio-stream", "tower", ] @@ -520,7 +519,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http 0.2.7", + "http 0.2.8", "tower", ] @@ -533,7 +532,7 @@ dependencies = [ "aws-sigv4", "aws-smithy-http", "aws-types", - "http 0.2.7", + "http 0.2.8", "thiserror", "tracing", ] @@ -547,7 +546,7 @@ dependencies = [ "aws-smithy-http", "form_urlencoded", "hex", - "http 0.2.7", + "http 0.2.8", "once_cell", "percent-encoding", "regex", @@ -580,7 +579,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "http 0.2.7", + "http 0.2.8", "http-body", "hyper", "hyper-rustls", @@ -602,7 +601,7 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http 0.2.7", + "http 0.2.8", "http-body", "hyper", "once_cell", @@ -621,7 +620,7 @@ checksum = "101a2e213acebe624cfb9bfc944de5e33c849e0df0f09c3d3aa3b54368dbe7af" dependencies = [ "aws-smithy-http", "bytes", - "http 0.2.7", + "http 0.2.8", "http-body", "pin-project", "tower", @@ -789,9 +788,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.9.1" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" +checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" [[package]] name = "byteorder" @@ -1507,21 +1506,19 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ - "cfg-if 1.0.0", "crc32fast", - "libc", "miniz_oxide", ] [[package]] name = "fluvio" -version = "0.12.11" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33255695d2f7020fb0a9fa8642ed3bd3eb97c3b789d2ecdbae9d2f84eb3d512c" +checksum = "9271c3ea409b35e68d57c50776ea486766b03ad08eeee1eefcfea494766833a0" dependencies = [ "async-channel", "async-lock", @@ -1651,9 +1648,9 @@ dependencies = [ [[package]] name = "fluvio-future" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6a89ad97db849764e15c37fa98146599d97e1e5c10460ec3b73d67277b8de9" +checksum = "1e2d1fad8e412d44c516add2349a34f6d71d9279b108c8e012325ab17fcbf982" dependencies = [ "async-io", "async-net", @@ -1691,7 +1688,7 @@ dependencies = [ "bytes", "fluvio-future", "fluvio-protocol-derive", - "tokio-util 0.7.2", + "tokio-util 0.7.3", "tracing", ] @@ -1781,7 +1778,7 @@ dependencies = [ "pin-project", "thiserror", "tokio", - "tokio-util 0.7.2", + "tokio-util 0.7.3", "tracing", ] @@ -1842,9 +1839,9 @@ dependencies = [ [[package]] name = "fluvio-types" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34e8812836b82cdeeee3448cfac91ad6957b25e0e92826cbd1a51a240cfef1a9" +checksum = "5417d908546297c524804167a0a8714db6512117d1b0fa52cf434e7bfc1577df" dependencies = [ "event-listener", "thiserror", @@ -2118,11 +2115,11 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.7", + "http 0.2.8", "indexmap", "slab", "tokio", - "tokio-util 0.7.2", + "tokio-util 0.7.3", "tracing", ] @@ -2217,9 +2214,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", @@ -2228,12 +2225,12 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http 0.2.7", + "http 0.2.8", "pin-project-lite 0.2.9", ] @@ -2300,16 +2297,16 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.18" +version = "0.14.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +checksum = "42dc3c131584288d375f2d07f822b0cb012d8c6fb899a5b9fdb3cb7eb9b6004f" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", "h2", - "http 0.2.7", + "http 0.2.8", "http-body", "httparse", "httpdate", @@ -2377,9 +2374,9 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" [[package]] name = "indexmap" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a" dependencies = [ "autocfg", "hashbrown", @@ -2611,9 +2608,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42c51df9d8d4842336c835df1d85ed447c4813baa237d033d95128bf5552ad8a" +checksum = "74141c8af4bb8136dafb5705826bdd9dce823021db897c1129191804140ddf84" dependencies = [ "twox-hash", ] @@ -2689,9 +2686,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.5.1" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" dependencies = [ "adler", ] @@ -2866,9 +2863,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b10983b38c53aebdf33f542c6275b0f58a238129d00c4ae0e6fb59738d783ca" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" [[package]] name = "opaque-debug" @@ -2919,9 +2916,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.73" +version = "0.9.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" +checksum = "835363342df5fba8354c5b453325b110ffd54044e588c539cf2f20a8014e4cb1" dependencies = [ "autocfg", "cc", @@ -2950,9 +2947,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", "parking_lot_core 0.9.3", @@ -3466,7 +3463,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 0.2.7", + "http 0.2.8", "http-body", "hyper", "hyper-tls", @@ -3557,7 +3554,7 @@ dependencies = [ "async-channel", "async-tungstenite", "bytes", - "http 0.2.7", + "http 0.2.8", "log", "pollster", "rustls-pemfile 0.3.0", @@ -4181,9 +4178,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" +checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf" dependencies = [ "proc-macro2", "quote", @@ -4202,9 +4199,9 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fa7e55043acb85fca6b3c01485a2eeb6b69c5d21002e273c79e465f43b7ac1" +checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1" [[package]] name = "tempfile" @@ -4380,9 +4377,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.2" +version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" +checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ "bytes", "libc", @@ -4390,7 +4387,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.12.0", + "parking_lot 0.12.1", "pin-project-lite 0.2.9", "signal-hook-registry", "socket2", @@ -4400,9 +4397,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" dependencies = [ "proc-macro2", "quote", @@ -4465,9 +4462,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", "pin-project-lite 0.2.9", @@ -4490,9 +4487,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ "bytes", "futures-core", @@ -4618,7 +4615,7 @@ dependencies = [ "base64 0.13.0", "byteorder", "bytes", - "http 0.2.7", + "http 0.2.8", "httparse", "log", "rand 0.8.5", @@ -4723,9 +4720,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.0.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cfcd319456c4d6ea10087ed423473267e1a071f3bc0aa89f80d60997843c6f0" +checksum = "c6d5d669b51467dcf7b2f1a796ce0f955f05f01cafda6c19d6e95f730df29238" dependencies = [ "getrandom 0.2.6", "serde", diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index efe89ab8..12f19979 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -44,11 +44,12 @@ impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { let mut stream = self.common.create_consumer_stream().await?; let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); + //let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); let _ = kafka_client.load_metadata_all()?; info!("Starting stream"); while let Some(Ok(record)) = stream.next().await { - let _ = self.send_to_kafka(&record, &mut kafka_client).await; + let _ = self.send_to_kafka(&record, &mut kafka_client).await?; } Ok(()) } @@ -70,8 +71,9 @@ impl KafkaOpt { Some(record.value()), ); let req = vec![msg]; - let _resp = - kafka_client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req)?; + let resp = + kafka_client.produce_messages(RequiredAcks::All, Duration::from_millis(10000), req)?; + debug!("Sent {:?}", resp); Ok(()) } } diff --git a/rust-connectors/sources/kafka/src/main.rs b/rust-connectors/sources/kafka/src/main.rs index 9e6e2977..e9cbfd28 100644 --- a/rust-connectors/sources/kafka/src/main.rs +++ b/rust-connectors/sources/kafka/src/main.rs @@ -70,7 +70,7 @@ impl KafkaOpt { } let _ = consumer.consume_messageset(ms)?; } - consumer.commit_consumed().unwrap(); + let _ = consumer.commit_consumed()?; } } } From 75f8f407f4ff492ee29af118a53c337e4257e55f Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Tue, 7 Jun 2022 17:26:21 -0400 Subject: [PATCH 06/18] Try out rdkafka --- Cargo.lock | 130 +++++++++++++++++++++++- rust-connectors/sinks/kafka/Cargo.toml | 3 +- rust-connectors/sinks/kafka/src/main.rs | 20 +++- 3 files changed, 149 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d3c14f1..28dfca6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -893,6 +893,15 @@ dependencies = [ "vec_map", ] +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", +] + [[package]] name = "color-backtrace" version = "0.5.1" @@ -1355,6 +1364,18 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "duct" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc6a0a59ed0888e0041cf708e66357b7ae1a82f1c67247e1f93b5e0818f7d8d" +dependencies = [ + "libc", + "once_cell", + "os_pipe", + "shared_child", +] + [[package]] name = "dyn-clone" version = "1.0.5" @@ -2501,7 +2522,7 @@ dependencies = [ "anyhow", "fluvio-connectors-common", "fluvio-future", - "kafka", + "rdkafka", "schemars", "serde", "serde_json", @@ -2573,6 +2594,18 @@ version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -2832,6 +2865,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num_threads" version = "0.1.6" @@ -2928,6 +2982,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_pipe" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb233f06c2307e1f5ce2ecad9f8121cffbbee2c95428f44ea85222e460d0d213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking" version = "2.0.0" @@ -3215,6 +3279,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-crate" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3367,6 +3441,38 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdkafka" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8acd8f5c5482fdf89e8878227bafa442d8c4409f6287391c85549ca83626c27" +dependencies = [ + "futures", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "3.0.0+1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca35e95c88e08cdc643b25744e38ccee7c93c7e90d1ac6850fe74cbaa40803c3" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", + "sasl2-sys", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -3691,6 +3797,18 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "sasl2-sys" +version = "0.1.20+2.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e645bd98535fc8fd251c43ba7c7c1f9be1e0369c99b6a5ea719052a773e655c" +dependencies = [ + "cc", + "duct", + "libc", + "pkg-config", +] + [[package]] name = "schannel" version = "0.1.20" @@ -3952,6 +4070,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared_child" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6be9f7d5565b1483af3e72975e2dee33879b3b86bd48c0929fccf6585d79e65a" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "signal-hook" version = "0.3.14" diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 3a151c7f..67ad376c 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -17,4 +17,5 @@ serde = "1" tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } -kafka = "0.9.0" +#kafka = "0.9.0" +rdkafka = { version = "0.25", features = ["cmake-build", "sasl", "ssl-vendored", "ssl"] } diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 12f19979..438b7fc0 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -5,8 +5,6 @@ use schemars::JsonSchema; use structopt::StructOpt; use tokio_stream::StreamExt; -use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; -use std::time::Duration; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -41,6 +39,23 @@ pub struct KafkaOpt { } impl KafkaOpt { + pub async fn execute(&self) -> anyhow::Result<()> { + let mut stream = self.common.create_consumer_stream().await?; + + info!("Starting stream"); + while let Some(Ok(record)) = stream.next().await { + let _ = self.send_to_kafka(&record).await?; + } + Ok(()) + } + pub async fn send_to_kafka( + &self, + record: &Record, + //kafka_client: &mut KafkaClient, + ) -> anyhow::Result<()> { + todo!() + } + /* pub async fn execute(&self) -> anyhow::Result<()> { let mut stream = self.common.create_consumer_stream().await?; let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); @@ -76,4 +91,5 @@ impl KafkaOpt { debug!("Sent {:?}", resp); Ok(()) } + */ } From ed3fbf9da8470f318acca44574cafccd6005845c Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Tue, 7 Jun 2022 17:29:08 -0400 Subject: [PATCH 07/18] clippy --- rust-connectors/sinks/kafka/src/main.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 438b7fc0..df2fa348 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -1,11 +1,10 @@ use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; -use fluvio_future::tracing::{debug, info}; +use fluvio_future::tracing::{info}; use schemars::schema_for; use schemars::JsonSchema; use structopt::StructOpt; use tokio_stream::StreamExt; - #[tokio::main] async fn main() -> anyhow::Result<()> { if let Some("metadata") = std::env::args().nth(1).as_deref() { @@ -50,7 +49,7 @@ impl KafkaOpt { } pub async fn send_to_kafka( &self, - record: &Record, + _record: &Record, //kafka_client: &mut KafkaClient, ) -> anyhow::Result<()> { todo!() From 9734edce22d998bea31e938b5a3758b1f56b87b3 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Tue, 7 Jun 2022 17:46:31 -0400 Subject: [PATCH 08/18] cargo fmt --- rust-connectors/sinks/kafka/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index df2fa348..83776a34 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -1,5 +1,5 @@ use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; -use fluvio_future::tracing::{info}; +use fluvio_future::tracing::info; use schemars::schema_for; use schemars::JsonSchema; use structopt::StructOpt; From 56403ac254abd36469b18bc9e444e5436d224ab6 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 11:48:46 -0400 Subject: [PATCH 09/18] see if CI passes --- .github/workflows/ci.yml | 4 ++ Cargo.lock | 8 ++-- rust-connectors/sinks/kafka/Cargo.toml | 2 +- rust-connectors/sinks/kafka/src/main.rs | 58 ++++++++++--------------- 4 files changed, 32 insertions(+), 40 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5640be15..dd88ae40 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,6 +114,10 @@ jobs: - uses: Swatinem/rust-cache@v1 with: key: ${{ matrix.os }}-${{ matrix.rust-target }}-${{ matrix.connector-name }} + + - name: Build + run: | + sudo snap install libsasl2-dev - name: Build env: CONNECTOR_NAME: ${{ matrix.connector-name }} diff --git a/Cargo.lock b/Cargo.lock index d9108808..e6893b9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3443,9 +3443,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.25.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8acd8f5c5482fdf89e8878227bafa442d8c4409f6287391c85549ca83626c27" +checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c" dependencies = [ "futures", "libc", @@ -3460,9 +3460,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "3.0.0+1.6.0" +version = "4.2.0+1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca35e95c88e08cdc643b25744e38ccee7c93c7e90d1ac6850fe74cbaa40803c3" +checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473" dependencies = [ "cmake", "libc", diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 67ad376c..457560bc 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -18,4 +18,4 @@ tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } #kafka = "0.9.0" -rdkafka = { version = "0.25", features = ["cmake-build", "sasl", "ssl-vendored", "ssl"] } +rdkafka = { version = "0.28", features = ["cmake-build", "sasl", "ssl-vendored", "ssl"] } diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 83776a34..90ef2297 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -1,5 +1,5 @@ use fluvio_connectors_common::opt::{CommonSourceOpt, Record}; -use fluvio_future::tracing::info; +use fluvio_future::tracing::{debug, info}; use schemars::schema_for; use schemars::JsonSchema; use structopt::StructOpt; @@ -37,58 +37,46 @@ pub struct KafkaOpt { pub common: CommonSourceOpt, } +use rdkafka::config::ClientConfig; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use std::time::Duration; + impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { let mut stream = self.common.create_consumer_stream().await?; - info!("Starting stream"); - while let Some(Ok(record)) = stream.next().await { - let _ = self.send_to_kafka(&record).await?; - } - Ok(()) - } - pub async fn send_to_kafka( - &self, - _record: &Record, - //kafka_client: &mut KafkaClient, - ) -> anyhow::Result<()> { - todo!() - } - /* - pub async fn execute(&self) -> anyhow::Result<()> { - let mut stream = self.common.create_consumer_stream().await?; - let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); - //let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); - let _ = kafka_client.load_metadata_all()?; + let producer: &FutureProducer = &ClientConfig::new() + .set("bootstrap.servers", "") + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "") + .set("sasl.password", "") + .set("session.timeout.ms", "45000") + .create() + .expect("Producer creation error"); info!("Starting stream"); while let Some(Ok(record)) = stream.next().await { - let _ = self.send_to_kafka(&record, &mut kafka_client).await?; + let _ = self.send_to_kafka(&record, producer).await?; } Ok(()) } pub async fn send_to_kafka( &self, record: &Record, - kafka_client: &mut KafkaClient, + kafka_producer: &FutureProducer, ) -> anyhow::Result<()> { let kafka_topic = self .kafka_topic .as_ref() .unwrap_or(&self.common.fluvio_topic); - let text = String::from_utf8_lossy(record.value()); - debug!("Sending {:?}, to kafka", text); - let msg = ProduceMessage::new( - kafka_topic, - self.common.fluvio_partition, - record.key(), - Some(record.value()), - ); - let req = vec![msg]; - let resp = - kafka_client.produce_messages(RequiredAcks::All, Duration::from_millis(10000), req)?; - debug!("Sent {:?}", resp); + let kafka_record = FutureRecord::to(kafka_topic.as_str()) + .payload(record.value()) + .key(record.key().unwrap_or(&[])); + let res = kafka_producer + .send(kafka_record, Duration::from_secs(0)) + .await; + debug!("Kafka produce {:?}", res); Ok(()) } - */ } From cfd4206ddb7253d4840cb9a4c4c7004892690136 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 11:51:44 -0400 Subject: [PATCH 10/18] maybe fix ci --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd88ae40..380d0cb3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,7 +117,7 @@ jobs: - name: Build run: | - sudo snap install libsasl2-dev + sudo apt-get install -y libsasl2-dev - name: Build env: CONNECTOR_NAME: ${{ matrix.connector-name }} From af481444bbfab730ccb28856c577fae3a399131e Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 12:06:29 -0400 Subject: [PATCH 11/18] Maybe fix build --- .github/workflows/ci.yml | 4 ---- Cargo.lock | 10 ++++++++++ rust-connectors/sinks/kafka/Cargo.toml | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 380d0cb3..5640be15 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,10 +114,6 @@ jobs: - uses: Swatinem/rust-cache@v1 with: key: ${{ matrix.os }}-${{ matrix.rust-target }}-${{ matrix.connector-name }} - - - name: Build - run: | - sudo apt-get install -y libsasl2-dev - name: Build env: CONNECTOR_NAME: ${{ matrix.connector-name }} diff --git a/Cargo.lock b/Cargo.lock index e6893b9f..7847f6e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2567,6 +2567,15 @@ dependencies = [ "libc", ] +[[package]] +name = "krb5-src" +version = "0.3.2+1.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44cd3b7e7735d48bc3793837041294f2eb747bd0f63bbc081e89972abb9e48fb" +dependencies = [ + "duct", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -3805,6 +3814,7 @@ checksum = "9e645bd98535fc8fd251c43ba7c7c1f9be1e0369c99b6a5ea719052a773e655c" dependencies = [ "cc", "duct", + "krb5-src", "libc", "pkg-config", ] diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 457560bc..43a38127 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -18,4 +18,4 @@ tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } #kafka = "0.9.0" -rdkafka = { version = "0.28", features = ["cmake-build", "sasl", "ssl-vendored", "ssl"] } +rdkafka = { version = "0.28", features = ["cmake-build", "ssl-vendored", "ssl", "gssapi", "gssapi-vendored"] } From 2494af3fb1e62ea7d4cbad320fbd408f8a4e1d92 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 15:03:53 -0400 Subject: [PATCH 12/18] Try fixing ci --- Makefile | 4 ++-- rust-connectors/sinks/kafka/Cargo.toml | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index d546c100..7507d3aa 100644 --- a/Makefile +++ b/Makefile @@ -73,8 +73,8 @@ FLUVIO_BUILD_ZIG ?= zig FLUVIO_BUILD_LLD ?= lld CC_aarch64_unknown_linux_musl=$(PWD)/build-scripts/aarch64-linux-musl-zig-cc CC_x86_64_unknown_linux_musl=$(PWD)/build-scripts/x86_64-linux-musl-zig-cc +CXX_x86_64_unknown_linux_musl=$(PWD)/build-scripts/x86_64-linux-musl-zig-c++ +CXX_aarch64_unknown_linux_musl=$(PWD)/build-scripts/aarch64-linux-musl-zig-c++ CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER=$(PWD)/build-scripts/ld.lld CARGO_TARGET_X86_64_UNKNOWN_LINUX_MUSL_LINKER=$(PWD)/build-scripts/ld.lld -CMAKE_CXX_COMPILER_x86_64_unknown_linux_musl=$(PWD)/build-scripts/x86_64-linux-musl-zig-c++ -CMAKE_C_COMPILER_x86_64_unknown_linux_musl=$(PWD)/build-scripts/x86_64-linux-musl-zig-cc DOCKER_TAG=$(CONNECTOR_VERSION)-$(GIT_COMMIT) diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 43a38127..0588e6f8 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -17,5 +17,4 @@ serde = "1" tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } -#kafka = "0.9.0" -rdkafka = { version = "0.28", features = ["cmake-build", "ssl-vendored", "ssl", "gssapi", "gssapi-vendored"] } +rdkafka = { version = "0.28", features = ["cmake-build", "ssl-vendored", "ssl", "gssapi", "gssapi-vendored", "libz-static"] } From 9cf08f9b4405a2b6194973de04a609877422cd98 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 15:07:19 -0400 Subject: [PATCH 13/18] Add zig-c++ scripts --- build-scripts/aarch64-linux-musl-zig-c++ | 3 +++ build-scripts/x86_64-linux-musl-zig-c++ | 3 +++ 2 files changed, 6 insertions(+) create mode 100755 build-scripts/aarch64-linux-musl-zig-c++ create mode 100755 build-scripts/x86_64-linux-musl-zig-c++ diff --git a/build-scripts/aarch64-linux-musl-zig-c++ b/build-scripts/aarch64-linux-musl-zig-c++ new file mode 100755 index 00000000..654afcf3 --- /dev/null +++ b/build-scripts/aarch64-linux-musl-zig-c++ @@ -0,0 +1,3 @@ +#!/bin/sh + +$FLUVIO_BUILD_ZIG c++ -target aarch64-linux-musl $@ diff --git a/build-scripts/x86_64-linux-musl-zig-c++ b/build-scripts/x86_64-linux-musl-zig-c++ new file mode 100755 index 00000000..ef7674de --- /dev/null +++ b/build-scripts/x86_64-linux-musl-zig-c++ @@ -0,0 +1,3 @@ +#!/bin/sh + +$FLUVIO_BUILD_ZIG c++ -target x86_64-linux-musl $@ From c4b8765dd9830e3fb2f127ed7a3692aef8a0e667 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 16:54:53 -0400 Subject: [PATCH 14/18] wip --- Cargo.lock | 56 ------------------------- rust-connectors/sinks/kafka/Cargo.toml | 2 +- rust-connectors/sinks/kafka/src/main.rs | 9 +++- 3 files changed, 8 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7847f6e2..864a7261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,18 +1364,6 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" -[[package]] -name = "duct" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc6a0a59ed0888e0041cf708e66357b7ae1a82f1c67247e1f93b5e0818f7d8d" -dependencies = [ - "libc", - "once_cell", - "os_pipe", - "shared_child", -] - [[package]] name = "dyn-clone" version = "1.0.5" @@ -2567,15 +2555,6 @@ dependencies = [ "libc", ] -[[package]] -name = "krb5-src" -version = "0.3.2+1.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44cd3b7e7735d48bc3793837041294f2eb747bd0f63bbc081e89972abb9e48fb" -dependencies = [ - "duct", -] - [[package]] name = "kv-log-macro" version = "1.0.7" @@ -2991,16 +2970,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "os_pipe" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb233f06c2307e1f5ce2ecad9f8121cffbbee2c95428f44ea85222e460d0d213" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "parking" version = "2.0.0" @@ -3477,9 +3446,7 @@ dependencies = [ "libc", "libz-sys", "num_enum", - "openssl-sys", "pkg-config", - "sasl2-sys", ] [[package]] @@ -3806,19 +3773,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "sasl2-sys" -version = "0.1.20+2.1.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e645bd98535fc8fd251c43ba7c7c1f9be1e0369c99b6a5ea719052a773e655c" -dependencies = [ - "cc", - "duct", - "krb5-src", - "libc", - "pkg-config", -] - [[package]] name = "schannel" version = "0.1.20" @@ -4080,16 +4034,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "shared_child" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6be9f7d5565b1483af3e72975e2dee33879b3b86bd48c0929fccf6585d79e65a" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "signal-hook" version = "0.3.14" diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 0588e6f8..513aeed1 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -17,4 +17,4 @@ serde = "1" tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } -rdkafka = { version = "0.28", features = ["cmake-build", "ssl-vendored", "ssl", "gssapi", "gssapi-vendored", "libz-static"] } +rdkafka = { version = "0.28", features = ["cmake-build"] } diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 90ef2297..e6c19ae7 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -26,8 +26,9 @@ async fn main() -> anyhow::Result<()> { #[derive(StructOpt, Debug, JsonSchema, Clone)] pub struct KafkaOpt { + /// A Comma separated list of the kafka brokers to connect to #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] - pub kafka_url: String, + pub kafka_brokers: String, #[structopt(long)] pub kafka_topic: Option, @@ -46,11 +47,15 @@ impl KafkaOpt { let mut stream = self.common.create_consumer_stream().await?; let producer: &FutureProducer = &ClientConfig::new() - .set("bootstrap.servers", "") + .set("bootstrap.servers", self.kafka_brokers.clone()) + + /* + * TODO: .set("security.protocol", "SASL_SSL") .set("sasl.mechanisms", "PLAIN") .set("sasl.username", "") .set("sasl.password", "") + */ .set("session.timeout.ms", "45000") .create() .expect("Producer creation error"); From 0e1e026a4c1f35cfdf8b8080ade5044483dca80e Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 18:54:15 -0400 Subject: [PATCH 15/18] attempted to use topic create on rdkafka --- rust-connectors/sinks/kafka/src/main.rs | 35 +++++++++++++++++++------ 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index e6c19ae7..3b4ce449 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -46,16 +46,35 @@ impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { let mut stream = self.common.create_consumer_stream().await?; - let producer: &FutureProducer = &ClientConfig::new() + /* + * TODO: + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "") + .set("sasl.password", "") + */ + use rdkafka::admin::{ + AdminClient, + AdminOptions, + NewTopic, + TopicReplication, + }; + use rdkafka::config::FromClientConfig; + + let admin = AdminClient::from_config( + ClientConfig::new() .set("bootstrap.servers", self.kafka_brokers.clone()) + .set("session.timeout.ms", "45000") + )?; + let kafka_topic = self + .kafka_topic + .as_ref() + .unwrap_or(&self.common.fluvio_topic); + let new_topic = NewTopic::new(kafka_topic.as_str(), 1, TopicReplication::Fixed(1)); + let _ = admin.create_topics(&[new_topic], &AdminOptions::new()).await?; - /* - * TODO: - .set("security.protocol", "SASL_SSL") - .set("sasl.mechanisms", "PLAIN") - .set("sasl.username", "") - .set("sasl.password", "") - */ + let producer: &FutureProducer = &ClientConfig::new() + .set("bootstrap.servers", self.kafka_brokers.clone()) .set("session.timeout.ms", "45000") .create() .expect("Producer creation error"); From 2d64861436917d81a4bf980416f7e83c4c78d9e4 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Wed, 8 Jun 2022 22:32:15 -0400 Subject: [PATCH 16/18] updates --- Cargo.lock | 1 + rust-connectors/sinks/kafka/Cargo.toml | 1 + rust-connectors/sinks/kafka/src/main.rs | 74 +++++++++++++++++------ rust-connectors/sources/kafka/src/main.rs | 5 +- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 864a7261..4c685162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,6 +2510,7 @@ dependencies = [ "anyhow", "fluvio-connectors-common", "fluvio-future", + "kafka", "rdkafka", "schemars", "serde", diff --git a/rust-connectors/sinks/kafka/Cargo.toml b/rust-connectors/sinks/kafka/Cargo.toml index 513aeed1..975bbc0c 100644 --- a/rust-connectors/sinks/kafka/Cargo.toml +++ b/rust-connectors/sinks/kafka/Cargo.toml @@ -18,3 +18,4 @@ tokio-stream = "0.1" fluvio-future = { version = "0.3.15", features = ["subscriber"] } rdkafka = { version = "0.28", features = ["cmake-build"] } +kafka = "0.9.0" diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index 3b4ce449..cd2d784b 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { pub struct KafkaOpt { /// A Comma separated list of the kafka brokers to connect to #[structopt(long, env = "KAFKA_URL", hide_env_values = true)] - pub kafka_brokers: String, + pub kafka_url: String, #[structopt(long)] pub kafka_topic: Option, @@ -41,40 +41,36 @@ pub struct KafkaOpt { use rdkafka::config::ClientConfig; use rdkafka::producer::{FutureProducer, FutureRecord}; use std::time::Duration; - impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { let mut stream = self.common.create_consumer_stream().await?; /* - * TODO: - .set("security.protocol", "SASL_SSL") - .set("sasl.mechanisms", "PLAIN") - .set("sasl.username", "") - .set("sasl.password", "") - */ - use rdkafka::admin::{ - AdminClient, - AdminOptions, - NewTopic, - TopicReplication, - }; + * TODO: + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "") + .set("sasl.password", "") + */ + use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::config::FromClientConfig; let admin = AdminClient::from_config( ClientConfig::new() - .set("bootstrap.servers", self.kafka_brokers.clone()) - .set("session.timeout.ms", "45000") + .set("bootstrap.servers", self.kafka_url.clone()) + .set("session.timeout.ms", "45000"), )?; let kafka_topic = self .kafka_topic .as_ref() .unwrap_or(&self.common.fluvio_topic); let new_topic = NewTopic::new(kafka_topic.as_str(), 1, TopicReplication::Fixed(1)); - let _ = admin.create_topics(&[new_topic], &AdminOptions::new()).await?; + let _ = admin + .create_topics(&[new_topic], &AdminOptions::new()) + .await?; let producer: &FutureProducer = &ClientConfig::new() - .set("bootstrap.servers", self.kafka_brokers.clone()) + .set("bootstrap.servers", self.kafka_url.clone()) .set("session.timeout.ms", "45000") .create() .expect("Producer creation error"); @@ -104,3 +100,45 @@ impl KafkaOpt { Ok(()) } } +/* +use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; +use std::time::Duration; + +impl KafkaOpt { + pub async fn execute(&self) -> anyhow::Result<()> { + let mut stream = self.common.create_consumer_stream().await?; + let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); + //let mut kafka_client = KafkaClient::new(vec![self.kafka_url.clone()]); + let _ = kafka_client.load_metadata_all()?; + + info!("Starting stream"); + while let Some(Ok(record)) = stream.next().await { + let _ = self.send_to_kafka(&record, &mut kafka_client).await?; + } + Ok(()) + } + pub async fn send_to_kafka( + &self, + record: &Record, + kafka_client: &mut KafkaClient, + ) -> anyhow::Result<()> { + let kafka_topic = self + .kafka_topic + .as_ref() + .unwrap_or(&self.common.fluvio_topic); + let text = String::from_utf8_lossy(record.value()); + debug!("Sending {:?}, to kafka", text); + let msg = ProduceMessage::new( + kafka_topic, + self.common.fluvio_partition, + record.key(), + Some(record.value()), + ); + let req = vec![msg]; + let resp = + kafka_client.produce_messages(RequiredAcks::All, Duration::from_millis(10000), req)?; + debug!("Sent {:?}", resp); + Ok(()) + } +} +*/ diff --git a/rust-connectors/sources/kafka/src/main.rs b/rust-connectors/sources/kafka/src/main.rs index e9cbfd28..2ca17a1e 100644 --- a/rust-connectors/sources/kafka/src/main.rs +++ b/rust-connectors/sources/kafka/src/main.rs @@ -1,5 +1,5 @@ use fluvio_connectors_common::opt::CommonSourceOpt; -use fluvio_future::tracing::{debug, info}; +use fluvio_future::tracing::info; use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage}; use schemars::schema_for; use schemars::JsonSchema; @@ -64,9 +64,8 @@ impl KafkaOpt { loop { for ms in consumer.poll().unwrap().iter() { for m in ms.messages() { + info!("Sending {:?} to fluvio", m); let _ = producer.send(m.key, m.value).await?; - - debug!("{:?}", m); } let _ = consumer.consume_messageset(ms)?; } From 70db92b971740eec1239a079a5d77582dcb27d10 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Fri, 10 Jun 2022 13:19:30 -0400 Subject: [PATCH 17/18] Update readmes and changelogs --- Cargo.lock | 1 - rust-connectors/sinks/kafka/CHANGELOG.md | 4 +-- rust-connectors/sinks/kafka/README.md | 16 +++++----- rust-connectors/sinks/kafka/src/main.rs | 34 +++++++++++++--------- rust-connectors/sources/kafka/CHANGELOG.md | 4 +-- rust-connectors/sources/kafka/README.md | 17 ++++++----- rust-connectors/sources/kafka/src/main.rs | 6 +++- 7 files changed, 48 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c685162..864a7261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,7 +2510,6 @@ dependencies = [ "anyhow", "fluvio-connectors-common", "fluvio-future", - "kafka", "rdkafka", "schemars", "serde", diff --git a/rust-connectors/sinks/kafka/CHANGELOG.md b/rust-connectors/sinks/kafka/CHANGELOG.md index d5199577..1f5ff922 100644 --- a/rust-connectors/sinks/kafka/CHANGELOG.md +++ b/rust-connectors/sinks/kafka/CHANGELOG.md @@ -1,3 +1,3 @@ -# Slack Connector Change Log +# Kafka Sink Change Log -## http Version 0.1.0 - UNRELEASED +## kafka-sink Version 0.1.0 - UNRELEASED diff --git a/rust-connectors/sinks/kafka/README.md b/rust-connectors/sinks/kafka/README.md index 44430a52..a88da1bf 100644 --- a/rust-connectors/sinks/kafka/README.md +++ b/rust-connectors/sinks/kafka/README.md @@ -1,10 +1,12 @@ -# Fluvio Slack Connector +# Fluvio Kafka Sink Connector -## Slack Sink Connector +This is a connector for taking data from a Kafka topic and going to a fluvio topic -Controls the Source connector -| Option | default | type | description | -| :--- | :--- | :--- | :---- | -| WEBHOOK_URL | - | String | The secret key for the slack webhook url | -| webhook-url | - | String | The paramaters key for the slack webhook url| +## Controls the Sink connector + +| Option | default | type | description | +| :--- | :--- | :--- | :---- | +| kafka-url | - | String | The url for the kafka connector | +| kafka-topic | same topic as fluvio | String | The kafka topic | +| kafka-partition | 0 | String | The kafka partition | diff --git a/rust-connectors/sinks/kafka/src/main.rs b/rust-connectors/sinks/kafka/src/main.rs index cd2d784b..7f0918bb 100644 --- a/rust-connectors/sinks/kafka/src/main.rs +++ b/rust-connectors/sinks/kafka/src/main.rs @@ -33,6 +33,9 @@ pub struct KafkaOpt { #[structopt(long)] pub kafka_topic: Option, + #[structopt(long)] + pub kafka_partition: Option, + #[structopt(flatten)] #[schemars(flatten)] pub common: CommonSourceOpt, @@ -43,27 +46,26 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use std::time::Duration; impl KafkaOpt { pub async fn execute(&self) -> anyhow::Result<()> { - let mut stream = self.common.create_consumer_stream().await?; - - /* - * TODO: - .set("security.protocol", "SASL_SSL") - .set("sasl.mechanisms", "PLAIN") - .set("sasl.username", "") - .set("sasl.password", "") - */ use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::config::FromClientConfig; + let kafka_topic = self + .kafka_topic + .as_ref() + .unwrap_or(&self.common.fluvio_topic); let admin = AdminClient::from_config( ClientConfig::new() .set("bootstrap.servers", self.kafka_url.clone()) .set("session.timeout.ms", "45000"), + /* + * TODO: Add SASL Authentication + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "") + .set("sasl.password", "") + */ )?; - let kafka_topic = self - .kafka_topic - .as_ref() - .unwrap_or(&self.common.fluvio_topic); + let new_topic = NewTopic::new(kafka_topic.as_str(), 1, TopicReplication::Fixed(1)); let _ = admin .create_topics(&[new_topic], &AdminOptions::new()) @@ -76,6 +78,7 @@ impl KafkaOpt { .expect("Producer creation error"); info!("Starting stream"); + let mut stream = self.common.create_consumer_stream().await?; while let Some(Ok(record)) = stream.next().await { let _ = self.send_to_kafka(&record, producer).await?; } @@ -90,9 +93,12 @@ impl KafkaOpt { .kafka_topic .as_ref() .unwrap_or(&self.common.fluvio_topic); - let kafka_record = FutureRecord::to(kafka_topic.as_str()) + let mut kafka_record = FutureRecord::to(kafka_topic.as_str()) .payload(record.value()) .key(record.key().unwrap_or(&[])); + if let Some(kafka_partition) = self.kafka_partition { + kafka_record = kafka_record.partition(kafka_partition); + } let res = kafka_producer .send(kafka_record, Duration::from_secs(0)) .await; diff --git a/rust-connectors/sources/kafka/CHANGELOG.md b/rust-connectors/sources/kafka/CHANGELOG.md index d5199577..9ea91123 100644 --- a/rust-connectors/sources/kafka/CHANGELOG.md +++ b/rust-connectors/sources/kafka/CHANGELOG.md @@ -1,3 +1,3 @@ -# Slack Connector Change Log +# Kafka Source Connector Change Log -## http Version 0.1.0 - UNRELEASED +## kafka-source version 0.1.0 - UNRELEASED diff --git a/rust-connectors/sources/kafka/README.md b/rust-connectors/sources/kafka/README.md index 44430a52..f5766543 100644 --- a/rust-connectors/sources/kafka/README.md +++ b/rust-connectors/sources/kafka/README.md @@ -1,10 +1,13 @@ -# Fluvio Slack Connector +# Fluvio Kafka Source Connector -## Slack Sink Connector +This is a connector for taking data from a Kafka topic and going to a fluvio topic -Controls the Source connector -| Option | default | type | description | -| :--- | :--- | :--- | :---- | -| WEBHOOK_URL | - | String | The secret key for the slack webhook url | -| webhook-url | - | String | The paramaters key for the slack webhook url| +## Controls the Source connector + +| Option | default | type | description | +| :--- | :--- | :--- | :---- | +| kafka-url | - | String | The url for the kafka connector | +| kafka-topic | same topic as fluvio | String | The kafka topic | +| kafka-partition | 0 | String | The kafka partition | +| kafka-group | "fluvio-kafka-source" | String | The kafka group | diff --git a/rust-connectors/sources/kafka/src/main.rs b/rust-connectors/sources/kafka/src/main.rs index 2ca17a1e..2efe8c9b 100644 --- a/rust-connectors/sources/kafka/src/main.rs +++ b/rust-connectors/sources/kafka/src/main.rs @@ -35,6 +35,10 @@ pub struct KafkaOpt { #[structopt(long)] pub kafka_topic: Option, + #[structopt(long, default_value = "0")] + #[schemars(skip)] + pub kafka_partition: i32, + #[structopt(flatten)] #[schemars(flatten)] pub common: CommonSourceOpt, @@ -50,7 +54,7 @@ impl KafkaOpt { .unwrap_or(&self.common.fluvio_topic); let mut consumer = Consumer::from_hosts(vec![self.kafka_url.clone()]) - .with_topic_partitions(kafka_topic.clone(), &[self.common.fluvio_partition]) + .with_topic_partitions(kafka_topic.clone(), &[self.kafka_partition]) .with_fallback_offset(FetchOffset::Earliest) .with_group( self.kafka_group From 8222e9f4d88af4d55e72231cc7975d7a89226335 Mon Sep 17 00:00:00 2001 From: Sebastian Imlay Date: Fri, 10 Jun 2022 13:20:41 -0400 Subject: [PATCH 18/18] Added kafka connectors to release matrix --- .github/workflows/release.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0f9f380b..3b53f2f0 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -40,6 +40,13 @@ jobs: - os: ubuntu-latest connector-name: dynamodb-sink image-name: infinyon/fluvio-connect-dynamodb-sink + - os: ubuntu-latest + connector-name: kafka-sink + image-name: infinyon/fluvio-connect-kafka-sink + - os: ubuntu-latest + connector-name: kafka-source + image-name: infinyon/fluvio-connect-kafka-source + steps: - uses: actions/checkout@v3 - name: Try pulling ${{ matrix.connector-name }} x86_64-unknown-linux-musl docker images