我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是几百万个)发布到Elasticsearch。在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示:
var fetchRecords = new TransformBlock<?, ?>(() => { ... }); var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records)); fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true }); fetchRecords.Post("Start");
然后我要实现的发送批量请求调用:
public IBulkResponse sendBulkRequest(List<?> records) { lock(SomeStaticObject) { // Execute several new threads to send records in bulk } }
我的问题 ,你 是对的实用性存在的数据流管道的一部分的锁内执行额外的线程。
这个可以吗?我可以在性能,执行,缓存/内存丢失等方面看到任何潜在的问题吗?
任何见识都会很高兴地被接受。
您可能要在BulkAll这里使用,它实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子
BulkAll
void Main() { var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); var connectionSettings = new ConnectionSettings(pool); var client = new ElasticClient(connectionSettings); var indexName = "bulk-index"; if (client.IndexExists(indexName).Exists) client.DeleteIndex(indexName); client.CreateIndex(indexName, c => c .Settings(s => s .NumberOfShards(3) .NumberOfReplicas(0) ) .Mappings(m => m .Map<DeviceStatus>(p => p.AutoMap()) ) ); var size = 500; // set up the observable var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b .Index(indexName) .MaxDegreeOfParallelism(4) .RefreshOnCompleted() .Size(size) ); var countdownEvent = new CountdownEvent(1); Exception exception = null; // set up an observer. Delegates passed are: // 1. onNext // 2. onError // 3. onCompleted var bulkAllObserver = new BulkAllObserver( response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"), ex => { // capture exception for throwing outside Observer. // You may decide to do something different here exception = ex; countdownEvent.Signal(); }, () => { Console.WriteLine("Finished"); countdownEvent.Signal(); }); // subscribe to the observable bulkAllObservable.Subscribe(bulkAllObserver); // wait indefinitely for it to finish. May want to put a // max timeout on this countdownEvent.Wait(); if (exception != null) { throw exception; } } // lazily enumerated collection private static IEnumerable<DeviceStatus> GetDeviceStatus() { for (var i = 0; i < DocumentCount; i++) yield return new DeviceStatus(i); } private const int DocumentCount = 20000; public class DeviceStatus { public DeviceStatus(int id) => Id = id; public int Id {get;set;} }
如果您不需要在观察者中做任何特别的事情,可以.Wait()在可观察对象上使用
.Wait()
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b .Index(indexName) .MaxDegreeOfParallelism(4) .RefreshOnCompleted() .Size(size) ) .Wait( TimeSpan.FromHours(1), response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries") );
有可观察的方法BulkAll,ScrollAll和Reindex(虽然有ReindexOnServer内Elasticsearch其重新索引和映射到所述重新索引API -的Reindex方法早此)
ScrollAll
Reindex
ReindexOnServer