# Written by Pawel Garbacki # see LICENSE.txt for license information import sys from traceback import print_exc, print_stack from time import time from Logger import get_logger from Swapper.Overlay.SecureOverlay import SecureOverlay from Swapper.CacheDB.CacheDBHandler import PeerDBHandler from BitTornado.bencode import bencode from BitTornado.BT1.MessageID import RESERVE_PIECES MAX_ROUNDS = 200 DEBUG = False class SingleDownloadHelperInterface: """ This interface should contain all methods that the PiecePiecker/Helper calls on the SingleDownload class. """ def __init__(self): self.frozen_by_helper = False def helper_set_freezing(self,val): self.frozen_by_helper = val def is_frozen_by_helper(self): return self.frozen_by_helper def is_choked(self): pass def helper_forces_unchoke(self): pass def _request_more(self, new_unchoke = False): pass class Helper: def __init__(self, torrent_hash, num_pieces, coordinator_permid, coordinator = None): self.secure_overlay = SecureOverlay.getInstance() self.torrent_hash = torrent_hash if coordinator_permid is not None and coordinator_permid == '': self.coordinator_permid = None else: self.coordinator_permid = coordinator_permid self.coordinator_ip = None # see is_coordinator() self.coordinator_port = -1 if self.coordinator_permid is not None: peerdb = PeerDBHandler() peer = peerdb.getPeer(coordinator_permid) if peer is not None: self.coordinator_ip = peer['ip'] self.coordinator_port = peer['port'] self.reserved_pieces = [False] * num_pieces self.ignored_pieces = [False] * num_pieces self.coordinator = coordinator self.counter = 0 self.completed = False self.distr_reserved_pieces = [False] * num_pieces self.marker = [True] * num_pieces self.round = 0 self.encoder = None self.continuations = [] self.outstanding = None self.last_req_time = 0 def set_encoder(self,encoder): self.encoder = encoder self.encoder.set_coordinator_ip(self.coordinator_ip) # To support a helping user stopping and restarting a torrent if self.coordinator_permid is not None: self.start_data_connection() def test(self): result = self.reserve_piece(10,None) print >> sys.stderr,"reserve piece returned: " + str(result) print >> sys.stderr,"Test passed" def _reserve_piece(self, piece): self.reserved_pieces[piece] = True self.distr_reserved_pieces[piece] = True self.ignored_pieces[piece] = False def _ignore_piece(self, piece): if not self.is_reserved(piece): self.ignored_pieces[piece] = True self.distr_reserved_pieces[piece] = True def is_coordinator(self,permid): # If we could get coordinator_ip, don't help if self.coordinator_ip is None: return False if self.coordinator_permid == permid: return True else: return False ### PiecePicker and Downloader interface def is_reserved(self, piece): if self.reserved_pieces[piece] or (self.coordinator is not None and self.is_complete()): return True return self.reserved_pieces[piece] def is_ignored(self, piece): if not self.ignored_pieces[piece] or (self.coordinator is not None and self.is_complete()): return False return self.ignored_pieces[piece] def is_complete(self): if self.completed: return True self.round = (self.round + 1) % MAX_ROUNDS if self.round != 0: return False if self.coordinator is not None: self.completed = (self.coordinator.reserved_pieces == self.marker) else: self.completed = (self.distr_reserved_pieces == self.marker) return self.completed def reserve_pieces(self, pieces, sdownload, all_or_nothing = False): pieces_to_send = [] ex = "None" result = [] for piece in pieces: if self.is_reserved(piece): result.append(piece) elif not self.is_ignored(piece): pieces_to_send.append(piece) #if DEBUG: # print >> sys.stderr,"helper: reserve_pieces: result is",result,"to_send is",pieces_to_send if pieces_to_send == []: return result if self.coordinator is not None: new_reserved_pieces = self.coordinator.reserve_pieces(pieces_to_send, all_or_nothing) for piece in new_reserved_pieces: self._reserve_piece(piece) else: self.send_or_queue_reservation(sdownload,pieces_to_send,result) return [] result = [] for piece in pieces: if self.is_reserved(piece): result.append(piece) else: self._ignore_piece(piece) return result def reserve_piece(self, piece, sdownload): if self.coordinator is not None and self.is_complete(): return True new_reserved_pieces = self.reserve_pieces([piece],sdownload) if new_reserved_pieces == []: return False else: return True ## Synchronization interface def send_or_queue_reservation(self,sdownload,pieces_to_send,result): """ Records the fact that a SingleDownload wants to reserve a piece with the coordinator. If it's the first, send the actual reservation request. """ if sdownload not in self.continuations: if DEBUG: print >> sys.stderr,"helper: Queuing reservation for",pieces_to_send self.continuations.append(sdownload) sdownload.helper_set_freezing(True) if len(self.continuations) > 0: self.send_reservation(pieces_to_send) def send_reservation(self,pieces_to_send): # Arno: I sometimes see no reply to a RESERVE_PIECE and the client # stops acquiring new pieces. The last_req_time is supposed # to fix this. waited = int(time())-self.last_req_time if self.outstanding is None or waited > 60: self.counter += 1 self.last_req_time = int(time()) if DEBUG: if self.outstanding is None: print >> sys.stderr,"helper: Sending reservation for",pieces_to_send,"because none" else: print >> sys.stderr,"helper: Sending reservation for",pieces_to_send,"because timeout" sdownload = self.continuations.pop(0) if self.outstanding is not None: # allow bypassed conn to restart self.outstanding.helper_set_freezing(False) self.outstanding = sdownload ex = "self.send_reserve_pieces(pieces_to_send)" self.send_reserve_pieces(pieces_to_send) def notify(self): """ Called by HelperMessageHandler to "wake up" the download that's waiting for its coordinator to reserve it a piece """ if self.outstanding is None: if DEBUG: print >> sys.stderr,"helper: notify: No continuation waiting???" else: if DEBUG: print >> sys.stderr,"helper: notify: Waking downloader" sdownload = self.outstanding self.outstanding = None # must be not before calling self.restart! self.restart(sdownload) #self.send_reservation() list = self.continuations[:] # copy just to be sure self.continuations = [] for sdownload in list: self.restart(sdownload) def restart(self,sdownload): # Chokes can get in while we're waiting for reply from coordinator. # But as we were called from _request_more() we were not choked # just before, so pretend we didn't see the message yet. if sdownload.is_choked(): sdownload.helper_forces_unchoke() sdownload.helper_set_freezing(False) sdownload._request_more() ## Coordinator comm. def send_reserve_pieces(self, pieces, all_or_nothing = False): if all_or_nothing: all_or_nothing = chr(1) else: all_or_nothing = chr(0) payload = self.torrent_hash + all_or_nothing + bencode(pieces) self.secure_overlay.addTask(self.coordinator_permid, RESERVE_PIECES + payload ) ### HelperMessageHandler interface def got_pieces_reserved(self, permid, pieces): self.handle_pieces_reserved(pieces) self.start_data_connection() def handle_pieces_reserved(self,pieces): if DEBUG: print >> sys.stderr,"helper: Coordinator replied",pieces try: for piece in pieces: if piece > 0: self._reserve_piece(piece) else: self._ignore_piece(-piece) self.counter -= 1 except Exception,e: print_exc() print >> sys.stderr,"helper: Exception in handle_pieces_reserved",e def start_data_connection(self): # Do this always, will return quickly when connection already exists dns = (self.coordinator_ip, self.coordinator_port) if DEBUG: print >> sys.stderr,"helper: Starting data connection to coordinator",dns self.encoder.start_connection(dns,id = None,coord_con = True)