mirror of
				https://github.com/sb745/NyaaV3.git
				synced 2025-11-04 01:45:46 +02:00 
			
		
		
		
	sync_es: fix flush_interval behavior during slow times
instead of flushing every N seconds, it flushed N seconds after the last change, which could drag out to N seconds * M batch size if there are few updates. Practically this doesn't change anything since stuff is always happening. Also fix not writing a save point if nothing is happening. Also practically does nothing, but for correctness.
This commit is contained in:
		
							parent
							
								
									33852a55bf
								
							
						
					
					
						commit
						eceb8824dc
					
				
					 1 changed files with 39 additions and 26 deletions
				
			
		
							
								
								
									
										65
									
								
								sync_es.py
									
										
									
									
									
								
							
							
						
						
									
										65
									
								
								sync_es.py
									
										
									
									
									
								
							| 
						 | 
					@ -263,54 +263,67 @@ class EsPoster(ExitingThread):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        last_save = time.time()
 | 
					        last_save = time.time()
 | 
				
			||||||
        since_last = 0
 | 
					        since_last = 0
 | 
				
			||||||
 | 
					        # XXX keep track of last posted position for save points, awkward
 | 
				
			||||||
 | 
					        posted_log_file = None
 | 
				
			||||||
 | 
					        posted_log_pos = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            actions = []
 | 
					            actions = []
 | 
				
			||||||
            while len(actions) < self.chunk_size:
 | 
					            now = time.time()
 | 
				
			||||||
 | 
					            # wait up to flush_interval seconds after starting the batch
 | 
				
			||||||
 | 
					            deadline = now + self.flush_interval
 | 
				
			||||||
 | 
					            while len(actions) < self.chunk_size and now < deadline:
 | 
				
			||||||
 | 
					                timeout = deadline - now
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    # grab next event from queue with metadata that creepily
 | 
					                    # grab next event from queue with metadata that creepily
 | 
				
			||||||
                    # updates, surviving outside the scope of the loop
 | 
					                    # updates, surviving outside the scope of the loop
 | 
				
			||||||
                    ((log_file, log_pos, timestamp), action) = \
 | 
					                    ((log_file, log_pos, timestamp), action) = \
 | 
				
			||||||
                            self.read_buf.get(block=True, timeout=self.flush_interval)
 | 
					                            self.read_buf.get(block=True, timeout=timeout)
 | 
				
			||||||
                    actions.append(action)
 | 
					                    actions.append(action)
 | 
				
			||||||
 | 
					                    now = time.time()
 | 
				
			||||||
                except Empty:
 | 
					                except Empty:
 | 
				
			||||||
                    # nothing new for the whole interval
 | 
					                    # nothing new for the whole interval
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if not actions:
 | 
					            if actions:
 | 
				
			||||||
                # nothing to post
 | 
					                # XXX "time" to get histogram of no events per bulk
 | 
				
			||||||
                log.debug("no changes...")
 | 
					                stats.timing('actions_per_bulk', len(actions))
 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # XXX "time" to get histogram of no events per bulk
 | 
					                try:
 | 
				
			||||||
            stats.timing('actions_per_bulk', len(actions))
 | 
					                    with stats.timer('post_bulk'):
 | 
				
			||||||
 | 
					                        bulk(es, actions, chunk_size=self.chunk_size)
 | 
				
			||||||
            try:
 | 
					                except BulkIndexError as bie:
 | 
				
			||||||
                with stats.timer('post_bulk'):
 | 
					                     # in certain cases where we're really out of sync, we update a
 | 
				
			||||||
                    bulk(es, actions, chunk_size=self.chunk_size)
 | 
					                     # stat when the torrent doc is, causing a "document missing"
 | 
				
			||||||
            except BulkIndexError as bie:
 | 
					                     # error from es, with no way to suppress that server-side.
 | 
				
			||||||
                 # in certain cases where we're really out of sync, we update a
 | 
					                     # Thus ignore that type of error if it's the only problem
 | 
				
			||||||
                 # stat when the torrent doc is, causing a "document missing"
 | 
					                    for e in bie.errors:
 | 
				
			||||||
                 # error from es, with no way to suppress that server-side.
 | 
					                        try:
 | 
				
			||||||
                 # Thus ignore that type of error if it's the only problem
 | 
					                            if e['update']['error']['type'] != 'document_missing_exception':
 | 
				
			||||||
                for e in bie.errors:
 | 
					                                raise bie
 | 
				
			||||||
                    try:
 | 
					                        except KeyError:
 | 
				
			||||||
                        if e['update']['error']['type'] != 'document_missing_exception':
 | 
					 | 
				
			||||||
                            raise bie
 | 
					                            raise bie
 | 
				
			||||||
                    except KeyError:
 | 
					 | 
				
			||||||
                        raise bie
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # how far we're behind, wall clock
 | 
					                # how far we've gotten in the actual log
 | 
				
			||||||
            stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
 | 
					                posted_log_file = log_file
 | 
				
			||||||
 | 
					                posted_log_pos = log_pos
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # how far we're behind, wall clock
 | 
				
			||||||
 | 
					                stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                log.debug("no changes...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            since_last += len(actions)
 | 
					            since_last += len(actions)
 | 
				
			||||||
            if since_last >= 10000 or (time.time() - last_save) > 10:
 | 
					            # TODO instead of this manual timeout loop, could move this to another queue/thread
 | 
				
			||||||
 | 
					            if posted_log_file is not None and (since_last >= 10000 or (time.time() - last_save) > 10):
 | 
				
			||||||
                log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
 | 
					                log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
 | 
				
			||||||
                with stats.timer('save_pos'):
 | 
					                with stats.timer('save_pos'):
 | 
				
			||||||
                    with open(SAVE_LOC, 'w') as f:
 | 
					                    with open(SAVE_LOC, 'w') as f:
 | 
				
			||||||
                        json.dump({"log_file": log_file, "log_pos": log_pos}, f)
 | 
					                        json.dump({"log_file": posted_log_file, "log_pos": posted_log_pos}, f)
 | 
				
			||||||
                last_save = time.time()
 | 
					                last_save = time.time()
 | 
				
			||||||
                since_last = 0
 | 
					                since_last = 0
 | 
				
			||||||
 | 
					                posted_log_file = None
 | 
				
			||||||
 | 
					                posted_log_pos = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# in-memory queue between binlog and es. The bigger it is, the more events we
 | 
					# in-memory queue between binlog and es. The bigger it is, the more events we
 | 
				
			||||||
# can parse in memory while waiting for es to catch up, at the expense of heap.
 | 
					# can parse in memory while waiting for es to catch up, at the expense of heap.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue