Parallel Databases:Interquery Parallelism

Interquery Parallelism

In interquery parallelism, different queries or transactions execute in parallel with one another. Transaction throughput can be increased by this form of parallelism. However, the response times of individual transactions are no faster than they would be if the transactions were run in isolation. Thus, the primary use of interquery parallelism is to scaleup a transaction-processing system to support a larger number of transactions per second.

Interquery parallelism is the easiest form of parallelism to support in a database system — particularly in a shared-memory parallel system. Database systems designed for single-processor systems can be used with few or no changes on a shared-memory parallel architecture, since even sequential database systems support concurrent processing. Transactions that would have operated in a time-shared concurrent manner on a sequential machine operate in parallel in the shared-memory parallel architecture.

Supporting interquery parallelism is more complicated in a shared-disk or sharednothing architecture. Processors have to perform some tasks, such as locking and logging, in a coordinated fashion, and that requires that they pass messages to each other. A parallel database system must also ensure that two processors do not update the same data independently at the same time. Further, when a processor accessesor updates data, the database system must ensure that the processor has the latest version of the data in its buffer pool. The problem of ensuring that the version is the latest is known as the cache-coherency problem.

Various protocols are available to guarantee cache coherency; often, cache-coherency protocols are integrated with concurrency-control protocols so that their overhead is reduced. One such protocol for a shared-disk system is this:

1. Before any read or write access to a page, a transaction locks the page in shared or exclusive mode, as appropriate. Immediately after the transaction obtains either a shared or exclusive lock on a page, it also reads the most recent copy of the page from the shared disk.

2. Before a transaction releases an exclusive lock on a page, it flushes the page to the shared disk; then, it releases the lock.

This protocol ensures that, when a transaction sets a shared or exclusive lock on a page, it gets the correct copy of the page.

More complex protocols avoid the repeated reading and writing to disk required by the preceding protocol. Such protocols do not write pages to disk when exclusive locks are released. When a shared or exclusive lock is obtained, if the most recent version of a page is in the buffer pool of some processor, the page is obtained from there. The protocols have to be designed to handle concurrent requests. The shared- disk protocols can be extended to shared-nothing architectures by this scheme: Each page has a home processor Pi, and is stored on disk Di. When other processors want to read or write the page, they send requests to the home processor Pi of the page, since they cannot directly communicate with the disk. The other actions are the same as in the shared-disk protocols.

The Oracle 8 and Oracle Rdb systems are examples of shared-disk parallel database systems that support interquery parallelism.

Intraquery Parallelism

Intraquery parallelism refers to the execution of a single query in parallel on multiple processors and disks. Using intraquery parallelism is important for speeding up long-running queries. Interquery parallelism does not help in this task, since each query is run sequentially.

To illustrate the parallel evaluation of a query, consider a query that requires a relation to be sorted. Suppose that the relation has been partitioned across multiple disks by range partitioning on some attribute, and the sort is requested on the partitioning attribute. The sort operation can be implemented by sorting each partition in parallel, then concatenating the sorted partitions to get the final sorted relation.

Thus, we can parallelize a query by parallelizing individual operations. There is another source of parallelism in evaluating a query: The operator tree for a query can contain multiple operations. We can parallelize the evaluation of the operator tree by evaluating in parallel some of the operations that do not depend on one another. Further, as Chapter 13 mentions, we may be able to pipeline the output of one operation to another operation. The two operations can be executed in parallel on separate processors, one generating output that is consumed by the other, even as it is generated.

In summary, the execution of a single query can be parallelized in two ways:

Intraoperation parallelism. We can speed up processing of a query by parallelizing the execution of each individual operation, such as sort, select, project, and join. We consider intraoperation parallelism in Section 20.5.

Interoperation parallelism. We can speed up processing of a query by executing in parallel the different operations in a query expression. We consider this form of parallelism in Section 20.6.

The two forms of parallelism are complementary, and can be used simultaneously on a query. Since the number of operations in a typical query is small, compared to the number of tuples processed by each operation, the first form of parallelism can scale better with increasing parallelism. However, with the relatively small number of processors in typical parallel systems today, both forms of parallelism are important.

In the following discussion of parallelization of queries, we assume that the queries are read only. The choice of algorithms for parallelizing query evaluation depends on the machine architecture. Rather than presenting algorithms for each architecture separately, we use a shared-nothing architecture model in our description. Thus, we explicitly describe when data have to be transferred from one processor to another. We can simulate this model easily by using the other architectures, since transfer of data can be done via shared memory in a shared-memory architecture, and via shared disks in a shared-disk architecture. Hence, algorithms for shared-nothing ar- chitectures can be used on the other architectures too. We mention occasionally how the algorithms can be further optimized for shared-memory or shared-disk systems.

To simplify the presentation of the algorithms, assume that there are n processors, P0, P1,... , Pn1, and n disks D0, D1,... , Dn1, where disk Di is associated with processor Pi. A real system may have multiple disks per processor. It is not hard to extend the algorithms to allow multiple disks per processor: We simply allow Di to be a set of disks. However, for simplicity, we assume here that Di is a single disk.

Intraoperation Parallelism

Since relational operations work on relations containing large sets of tuples, we can parallelize the operations by executing them in parallel on different subsets of the relations. Since the number of tuples in a relation can be large, the degree of parallelism is potentially enormous. Thus, intraoperation parallelism is natural in a database sys- tem. We shall study parallel versions of some common relational operations in Sections 20.5.1 through 20.5.3.

Parallel Sort

Suppose that we wish to sort a relation that resides on n disks D0, D1,..., Dn1. If the relation has been range partitioned on the attributes on which it is to be sorted, then, as noted in Section 20.2.2, we can sort each partition separately, and can concatenate the results to get the full sorted relation. Since the tuples are partitioned on n disks, the time required for reading the entire relation is reduced by the parallel access.

If the relation has been partitioned in any other way, we can sort it in one of two ways:

1. We can range partition it on the sort attributes, and then sort each partition separately.

2. We can use a parallel version of the external sort–merge algorithm.

Range-Partitioning Sort

Range-partitioning sort works in two steps: first range partitioning the relation, then sorting each partition separately. When we sort by range partitioning the relation, it is not necessary to range-partition the relation on the same set of processors or disks as those on which that relation is stored. Suppose that we choose processors P0, P1,... , Pm, where m < n to sort the relation. There are two steps involved in this operation:

1. Redistribute the tuples in the relation, using a range-partition strategy, so that all tuples that lie within the ith range are sent to processor Pi, which stores the relation temporarily on disk Di.

To implement range partitioning, in parallel every processor reads the tuples from its disk and sends the tuples to their destination processor. Each processor P0, P1,... , Pm also receives tuples belonging to its partition, and stores them locally. This step requires disk I/O and communication overhead.

2. Each of the processors sorts its partition of the relation locally, without inter- action with the other processors. Each processor executes the same operation — namely, sorting — on a different data set. (Execution of the same operation in parallel on different sets of data is called data parallelism.)

The final merge operation is trivial, because the range partitioning in the first phase ensures that, for 1 i < j m, the key values in processor Pi are all less than the key values in Pj .

We must do range partitioning with a good range-partition vector, so that each partition will have approximately the same number of tuples. Virtual processor partitioning can also be used to reduce skew.

Parallel External Sort–Merge

Parallel external sort–merge is an alternative to range partitioning. Suppose that a relation has already been partitioned among disks D0, D1,... , Dn1 (it does not mat- ter how the relation has been partitioned). Parallel external sort–merge then works this way:

1. Each processor Pi locally sorts the data on disk Di.

2. The system then merges the sorted runs on each processor to get the final sorted output.

The merging of the sorted runs in step 2 can be parallelized by this sequence of actions:

1. The system range-partitions the sorted partitions at each processor Pi (all by the same partition vector) across the processors P0, P1,... , Pm−1. It sends the tuples in sorted order, so that each processor receives the tuples in sorted streams.

2. Each processor Pi performs a merge on the streams as they are received, to get a single sorted run.

3. The system concatenates the sorted runs on processors P0, P1,... , Pm−1 to get the final result.

As described, this sequence of actions results in an interesting form of execution skew, since at first every processor sends all blocks of partition 0 to P0, then every processor sends all blocks of partition 1 to P1, and so on. Thus, while sending hap- pens in parallel, receiving tuples becomes sequential: first only P0 receives tuples, then only P1 receives tuples, and so on. To avoid this problem, each processor repeat- edly sends a block of data to each partition. In other words, each processor sends the first block of every partition, then sends the second block of every partition, and so on. As a result, all processors receive data in parallel.

Some machines, such as the Teradata DBC series machines, use specialized hard- ware to perform merging. The Y-net interconnection network in the Teradata DBC

machines can merge output from multiple processors to give a single sorted output.

Parallel Join

The join operation requires that the system test pairs of tuples to see whether they satisfy the join condition; if they do, the system adds the pair to the join output. Parallel join algorithms attempt to split the pairs to be tested over several processors. Each processor then computes part of the join locally. Then, the system collects the results from each processor to produce the final result.

Partitioned Join

For certain kinds of joins, such as equi-joins and natural joins, it is possible to partition the two input relations across the processors, and to compute the join locally at each processor. Suppose that we are using n processors, and that the relations to be joined are r and s. Partitioned join then works this way: The system partitions the relations r and s each into n partitions, denoted r0, r1,... , rn1 and s0, s1,... , sn1. The system sends partitions ri and si to processor Pi, where their join is computed locally.

The partitioned join technique works correctly only if the join is an equi-join (for s) and if we partition r and s by the same partitioning function on their join attributes. The idea of partitioning is exactly the same as that behind the partitioning step of hash–join. In a partitioned join, however, there are two different ways of partitioning r and s:

• Range partitioning on the join attributes

• Hash partitioning on the join attributes

In either case, the same partitioning function must be used for both relations. For range partitioning, the same partition vector must be used for both relations. For hash partitioning, the same hash function must be used on both relations. Figure 20.2 depicts the partitioning in a partitioned parallel join.

Once the relations are partitioned, we can use any join technique locally at each processor Pi to compute the join of ri and si. For example, hash–join, merge–join, or

nested-loop join could be used. Thus, we can use partitioning to parallelize any join technique.

image

If one or both of the relations r and s are already partitioned on the join attributes (by either hash partitioning or range partitioning), the work needed for partitioning is reduced greatly. If the relations are not partitioned, or are partitioned on attributes other than the join attributes, then the tuples need to be repartitioned. Each processor Pi reads in the tuples on disk Di, computes for each tuple t the partition j to which t belongs, and sends tuple t to processor Pj . Processor Pj stores the tuples on disk Dj .

We can optimize the join algorithm used locally at each processor to reduce I/O by buffering some of the tuples to memory, instead of writing them to disk. We describe such optimizations in Section 20.5.2.3.

Skew presents a special problem when range partitioning is used, since a partition vector that splits one relation of the join into equal-sized partitions may split the other relations into partitions of widely varying size. The partition vector should be such that | ri | + | si | (that is, the sum of the sizes of ri and si) is roughly equal over all the i = 0, 1,... ,n 1. With a good hash function, hash partitioning is likely to have a smaller skew, except when there are many tuples with the same values for the join attributes.

Fragment-and-Replicate Join

Partitioning is not applicable to all types of joins. For instance, if the join condition is an inequality, such as r r.a<s.b s, it is possible that all tuples in r join with some tuple in s (and vice versa). Thus, there may be no easy way of partitioning r and s so that tuples in partition ri join with only tuples in partition si.

We can parallelize such joins by using a technique called fragment and replicate. We first consider a special case of fragment and replicate — asymmetric fragment-and-replicate join — which works as follows.

1. The system partitions one of the relations — say, r. Any partitioning technique can be used on r, including round-robin partitioning.

2. The system replicates the other relation, s, across all the processors.

3. Processor Pi then locally computes the join of ri with all of s, using any join technique.

The asymmetric fragment-and-replicate scheme appears in Figure 20.3a. If r is al- ready stored by partitioning, there is no need to partition it further in step 1. All that is required is to replicate s across all processors.

The general case of fragment and replicate join appears in Figure 20.3b; it works this way: The system partitions relation r into n partitions, r0, r1,..., rn1, and partitions s into m partitions, s0, s1,... , sm−1. As before, any partitioning technique may be used on r and on s. The values of m and n do not need to be equal, but they must be chosen so that there are at least m n processors. Asymmetric fragment and replicate is simply a special case of general fragment and replicate, where m = 1.

Fragment and replicate reduces the sizes of the relations at each processor, compared to asymmetric fragment and replicate.

image

Let the processors be P0,0, P0,1,... , P0,m−1, P1,0,..., Pn1,m−1. Processor Pi,j com- putes the join of ri with sj . Each processor must get the tuples in the partitions it works on. To do so, the system replicates ri to processors Pi,0, Pi,1,... , Pi,m−1 (which form a row in Figure 20.3b), and replicates si to processors P0,i, P1,i,... , Pn1,i (which form a column in Figure 20.3b). Any join technique can be used at each processor Pi,j .

Fragment and replicate works with any join condition, since every tuple in r can be tested with every tuple in s. Thus, it can be used where partitioning cannot be.

Fragment and replicate usually has a higher cost than partitioning when both relations are of roughly the same size, since at least one of the relations has to be replicated. However, if one of the relations — say, s — is small, it may be cheaper to replicate s across all processors, rather than to repartition r and s on the join attributes. In such a case, asymmetric fragment and replicate is preferable, even though partitioning could be used.

Partitioned Parallel Hash–Join

The partitioned hash–join of Section 13.5.5 can be parallelized. Suppose that we have n processors, P0, P1,... , Pn1, and two relations r and s, such that the relations r and s are partitioned across multiple disks. Recall from Section 13.5.5 that the smaller relation is chosen as the build relation. If the size of s is less than that of r, the parallel hash–join algorithm proceeds this way:

1. Choose a hash function — say, h1 — that takes the join attribute value of each tuple in r and s and maps the tuple to one of the n processors. Let ri denote the tuples of relation r that are mapped to processor Pi; similarly, let si denote the tuples of relation s that are mapped to processor Pi. Each processor Pi reads the tuples of s that are on its disk Di, and sends each tuple to the appropriate processor on the basis of hash function h1.

2. As the destination processor Pi receives the tuples of si, it further partitions them by another hash function, h2, which the processor uses to compute the hash–join locally. The partitioning at this stage is exactly the same as in the partitioning phase of the sequential hash–join algorithm. Each processor Pi executes this step independently from the other processors.

3. Once the tuples of s have been distributed, the system redistributes the larger relation r across the m processors by the hash function h1, in the same way as before. As it receives each tuple, the destination processor repartitions it by the function h2, just as the probe relation is partitioned in the sequential hash–join algorithm.

4. Each processor Pi executes the build and probe phases of the hash–join algorithm on the local partitions ri and si of r and s to produce a partition of the final result of the hash–join.

The hash–join at each processor is independent of that at other processors, and receiving the tuples of ri and si is similar to reading them from disk. Therefore, any of the optimizations of the hash–join described in Chapter 13 can be applied as well to the parallel case. In particular, we can use the hybrid hash–join algorithm to cache some of the incoming tuples in memory, and thus avoid the costs of writing them and of reading them back in.

Parallel Nested-Loop Join

To illustrate the use of fragment-and-replicate – based parallelization, consider the case where the relation s is much smaller than relation r. Suppose that relation r is stored by partitioning; the attribute on which it is partitioned does not matter. Sup- pose too that there is an index on a join attribute of relation r at each of the partitions of relation r.

We use asymmetric fragment and replicate, with relation s being replicated and with the existing partitioning of relation r. Each processor Pj where a partition of relation s is stored reads the tuples of relation s stored in Dj , and replicates the tuples to every other processor Pi. At the end of this phase, relation s is replicated at all sites that store tuples of relation r.

Now, each processor Pi performs an indexed nested-loop join of relation s with the ith partition of relation r. We can overlap the indexed nested-loop join with the distribution of tuples of relation s, to reduce the costs of writing the tuples of relation s to disk, and of reading them back. However, the replication of relation s must be synchronized with the join so that there is enough space in the in-memory buffers at each processor Pi to hold the tuples of relation s that have been received but that have not yet been used in the join.

Other Relational Operations

The evaluation of other relational operations also can be parallelized:

Selection. Let the selection be σθ (r). Consider first the case where θ is of the form ai = v, where ai is an attribute and v is a value. If the relation r is partitioned on ai, the selection proceeds at a single processor. If θ is of the form l ai u — that is, θ is a range selection — and the relation has been range-partitioned on ai, then the selection proceeds at each processor whose partition overlaps with the specified range of values. In all other cases, the selection proceeds in parallel at all the processors.

Duplicate elimination. Duplicates can be eliminated by sorting; either of the parallel sort techniques can be used, optimized to eliminate duplicates as soon as they appear during sorting. We can also parallelize duplicate elimination by partitioning the tuples (by either range or hash partitioning) and eliminating duplicates locally at each processor.

Projection. Projection without duplicate elimination can be performed as tuples are read in from disk in parallel. If duplicates are to be eliminated, either of the techniques just described can be used.

Aggregation. Consider an aggregation operation. We can parallelize the operation by partitioning the relation on the grouping attributes, and then computing the aggregate values locally at each processor. Either hash partitioning or range partitioning can be used. If the relation is already partitioned on the grouping attributes, the first step can be skipped.

We can reduce the cost of transferring tuples during partitioning by partly computing aggregate values before partitioning, at least for the commonly used aggregate functions. Consider an aggregation operation on a relation r, using the sum aggregate function on attribute B, with grouping on attribute A. The system can perform the operation at each processor Pi on those r tuples stored on disk Di. This computation results in tuples with partial sums at each processor; there is one tuple at Pi for each value for attribute A present in r tuples stored on Di. The system partitions the result of the local aggregationon  the grouping attribute A, and performs the aggregation again (on tuples with the partial sums) at each processor Pi to get the final result.

As a result of this optimization, fewer tuples need to be sent to other processors during partitioning. This idea can be extended easily to the min and max aggregate functions. Extensions to the count and avg aggregate functions are left for you to do in Exercise 20.8.

The parallelization of other operations is covered in several of the the exercises.

Cost of Parallel Evaluation of Operations

We achieve parallelism by partitioning the I/O among multiple disks, and partitioning the CPU work among multiple processors. If such a split is achieved without any overhead, and if there is no skew in the splitting of work, a parallel operation using n processors will take 1/n times as long as the same operation on a single processor. We already know how to estimate the cost of an operation such as a join or a selection. The time cost of parallel processing would then be 1/n of the time cost of sequential processing of the operation.

We must also account for the following costs:

Startup costs for initiating the operation at multiple processors

Skew in the distribution of work among the processors, with some processors getting a larger number of tuples than others

Contention for resources — such as memory, disk, and the communication network — resulting in delays

Cost of assembling the final result by transmitting partial results from each processor

The time taken by a parallel operation can be estimated as

Tpart + Tasm + max(T0, T1,..., Tn1)

where Tpart is the time for partitioning the relations, Tasm is the time for assembling the results and Ti the time taken for the operation at processor Pi. Assuming that the tuples are distributed without any skew, the number of tuples sent to each processor can be estimated as 1/n of the total number of tuples. Ignoring contention, the cost Ti of the operations at each processor Pi can then be estimated by the techniques in Chapter 13.

The preceding estimate will be an optimistic estimate, since skew is common. Even though breaking down a single query into a number of parallel steps reduces the size of the average step, it is the time for processing the single slowest step that determines the time taken for processing the query as a whole. A partitioned parallel eval- uation, for instance, is only as fast as the slowest of the parallel executions. Thus, any skew in the distribution of the work across processors greatly affects performance.

The problem of skew in partitioning is closely related to the problem of partition overflow in sequential hash–joins (Chapter 13). We can use overflow resolution and avoidance techniques developed for hash–joins to handle skew when hash partitioning is used. We can use balanced range partitioning and virtual processor partitioning to minimize skew due to range partitioning, as in Section 20.2.3.

Interoperation Parallelism

There are two forms of interoperation parallelism: pipelined parallelism, and inde- pendent parallelism.

Pipelined Parallelism

As discussed in Chapter 13, pipelining forms an important source of economy of computation for database query processing. Recall that, in pipelining, the output tu- ples of one operation, A, are consumed by a second operation, B, even before the first operation has produced the entire set of tuples in its output. The major advantage of pipelined execution in a sequential evaluation is that we can carry out a sequence of such operations without writing any of the intermediate results to disk.

Parallel systems use pipelining primarily for the same reason that sequential systems do. However, pipelines are a source of parallelism as well, in the same way that instruction pipelines are a source of parallelism in hardware design. It is possible to run operations A and B simultaneously on different processors, so that B consumes tuples in parallel with A producing them. This form of parallelism is called pipelined parallelism.

Consider a join of four relations:

r1 r2 r3 r4

We can set up a pipeline that allows the three joins to be computed in parallel. Sup- pose processor P1 is assigned the computation of temp1 r1 r2, and P2 is assigned the computation of r3 temp1. As P1 computes tuples in r1 r2, it makes these tuples available to processor P2. Thus, P2 has available to it some of the tuples in r1 r2 before P1 has finished its computation. P2 can use those tuples that are available to begin computation of temp1 r3, even before r1 r2 is fully computed by P1. Likewise, as P2 computes tuples in (r1 r2)r3 , it makes these tuples available to P3, which computes the join of these tuples with r4.

Pipelined parallelism is useful with a small number of processors, but does not scale up well. First, pipeline chains generally do not attain sufficient length to pro- vide a high degree of parallelism. Second, it is not possible to pipeline relational operators that do not produce output until all inputs have been accessed, such as the set-difference operation. Third, only marginal speedup is obtained for the frequent cases in which one operator’s execution cost is much higher than are those of the others.

All things considered, when the degree of parallelism is high, pipelining is a less important source of parallelism than partitioning. The real reason for using pipelining is that pipelined executions can avoid writing intermediate results to disk.

Independent Parallelism
Operations in a query expression that do not depend on one another can be executed in parallel. This form of parallelism is called independent parallelism.

Consider the join r1 r2 r3 r4. Clearly, we can compute temp1 r1 r2

in parallel with temp2 r3 r4. When these two computations complete, we compute temp1 temp2

To obtain further parallelism, we can pipeline the tuples in temp1 and temp2 into the computation of temp1 temp2, which is itself carried out by a pipelined join (Section 13.7.2.2).

Like pipelined parallelism, independent parallelism does not provide a high degree of parallelism, and is less useful in a highly parallel system, although it is useful with a lower degree of parallelism.

Query Optimization

Query optimizers account in large measure for the success of relational technology. Recall that a query optimizer takes a query and finds the cheapest execution plan among the many possible execution plans that give the same answer.

Query optimizers for parallel query evaluation are more complicated than query optimizers for sequential query evaluation. First, the cost models are more complicated, since partitioning costs have to be accounted for, and issues such as skew and resource contention must be taken into account. More important is the issue of how to parallelize a query. Suppose that we have somehow chosen an expression (from among those equivalent to the query) to be used for evaluating the query. The expression can be represented by an operator tree, as in Section 13.1.

To evaluate an operator tree in a parallel system, we must make the following decisions:

• How to parallelize each operation, and how many processors to use for it

• What operations to pipeline across different processors, what operations to execute independently in parallel, and what operations to execute sequentially, one after the other These decisions constitute the task of scheduling the execution tree.

Determining the resources of each kind — such as processors, disks, and memory — that should be allocated to each operation in the tree is another aspect of the optimization problem. For instance, it may appear wise to use the maximum amount of parallelism available, but it is a good idea not to execute certain operations in parallel. Operations whose computational requirements are significantly smaller than the communication overhead should be clustered with one of their neighbors. Otherwise, the advantage of parallelism is negated by the overhead of communication.

One concern is that long pipelines do not lend themselves to good resource utilization. Unless the operations are coarse grained, the final operation of the pipeline may wait for a long time to get inputs, while holding precious resources, such as memory.

Hence, long pipelines should be avoided.

The number of parallel evaluation plans from which to choose is much larger than the number of sequential evaluation plans. Optimizing parallel queries by considering all alternatives is therefore much more expensive than optimizing sequential queries. Hence, we usually adopt heuristic approaches to reduce the number of parallel execution plans that we have to consider. We describe two popular heuristics here.

The first heuristic is to consider only evaluation plans that parallelize every operation across all processors, and that do not use any pipelining. This approach is used in the Teradata DBC series machines. Finding the best such execution plan is like doing query optimization in a sequential system. The main differences lie in how the partitioning is performed and what cost-estimation formula is used.

The second heuristic is to choose the most efficient sequential evaluation plan, and then to parallelize the operations in that evaluation plan. The Volcano parallel database popularized a model of parallelization called the exchange-operator model.

This model uses existing implementations of operations, operating on local copies of data, coupled with an exchange operation that moves data around between different processors. Exchange operators can be introduced into an evaluation plan to transform it into a parallel evaluation plan.

Yet another dimension of optimization is the design of physical-storage organization to speed up queries. The optimal physical organization differs for different queries. The database administrator must choose a physical organization that appears to be good for the expected mix of database queries. Thus, the area of parallel query optimization is complex, and it is still an area of active research.

Design of Parallel Systems

So far this chapter has concentrated on parallelization of data storage and of query processing. Since large-scale parallel database systems are used primarily for storing large volumes of data, and for processing decision-support queries on those data, these topics are the most important in a parallel database system. Parallel loading of data from external sources is an important requirement, if we are to handle large volumes of incoming data.

A large parallel database system must also address these availability issues:

• Resilience to failure of some processors or disks

• Online reorganization of data and schema changes We consider these issues here.

With a large number of processors and disks, the probability that at least one processor or disk will malfunction is significantly greater than in a single-processor system with one disk. A poorly designed parallel system will stop functioning if any component (processor or disk) fails. Assuming that the probability of failure of a single processor or disk is small, the probability of failure of the system goes up linearly with the number of processors and disks. If a single processor or disk would fail once every 5 years, a system with 100 processors would have a failure every 18 days.

Therefore, large-scale parallel database systems, such as Compaq Himalaya, Teradata, and Informix XPS (now a division of IBM), are designed to operate even if a processor or disk fails. Data are replicated across at least two processors. If a processor fails, the data that it stored can still be accessed from the other processors. The system keeps track of failed processors and distributes the work among functioning processors. Requests for data stored at the failed site are automatically routed to the backup sites that store a replica of the data. If all the data of a processor A are replicated at a single processor B, B will have to handle all the requests to A as well as those to itself, and that will result in B becoming a bottleneck. Therefore, the replicas of the data of a processor are partitioned across multiple other processors.

When we are dealing with large volumes of data (ranging in the terabytes), simple operations, such as creating indices, and changes to schema, such as adding a column to a relation, can take a long time — perhaps hours or even days. Therefore, it is unacceptable for the database system to be unavailable while such operations are in progress. Many parallel database systems, such as the Compaq Himalaya systems, allow such operations to be performed online, that is, while the system is executing other transactions.

Consider, for instance, online index construction. A system that supports this feature allows insertions, deletions, and updates on a relation even as an index is being built on the relation. The index-building operation therefore cannot lock the entire relation in shared mode, as it would have done otherwise. Instead, the process keeps track of updates that occur while it is active, and incorporates the changes into the index being constructed.

Comments

Popular posts from this blog

Concurrency Control:Shadow Paging

Choice of Evaluation Plans

Entity-Relationship Model part2